Index: test/org/apache/lucene/index/TestIndexWriterConcurrentMergeMerging.java =================================================================== --- test/org/apache/lucene/index/TestIndexWriterConcurrentMergeMerging.java (revision 0) +++ test/org/apache/lucene/index/TestIndexWriterConcurrentMergeMerging.java (revision 0) @@ -0,0 +1,107 @@ +package org.apache.lucene.index; +/** + * Copyright 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.RAMDirectory; +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import junit.framework.TestCase; + +import java.io.IOException; + + +public class TestIndexWriterConcurrentMergeMerging extends TestCase +{ + + /** + * Tests that index merging (specifically addIndexes()) doesn't + * change the index order of documents. + */ + public void testLucene() throws IOException + { + + int num=100; + + Directory indexA = new RAMDirectory(); + Directory indexB = new RAMDirectory(); + + fillIndex(indexA, 0, num); + boolean fail = verifyIndex(indexA, 0); + if (fail) + { + fail("Index a is invalid"); + } + + fillIndex(indexB, num, num); + fail = verifyIndex(indexB, num); + if (fail) + { + fail("Index b is invalid"); + } + + Directory merged = new RAMDirectory(); + + IndexWriterConcurrentMerge writer = new IndexWriterConcurrentMerge(merged, new StandardAnalyzer(), true); + writer.setMergeFactor(2); + + writer.addIndexes(new Directory[]{indexA, indexB}); + writer.close(); + + fail = verifyIndex(merged, 0); + merged.close(); + + assertFalse("The merged index is invalid", fail); + } + + private boolean verifyIndex(Directory directory, int startAt) throws IOException + { + boolean fail = false; + IndexReader reader = IndexReader.open(directory); + + int max = reader.maxDoc(); + for (int i = 0; i < max; i++) + { + Document temp = reader.document(i); + //System.out.println("doc "+i+"="+temp.getField("count").stringValue()); + //compare the index doc number to the value that it should be + if (!temp.getField("count").stringValue().equals((i + startAt) + "")) + { + fail = true; + System.out.println("Document " + (i + startAt) + " is returning document " + temp.getField("count").stringValue()); + } + } + return fail; + } + + private void fillIndex(Directory dir, int start, int numDocs) throws IOException + { + + IndexWriterConcurrentMerge writer = new IndexWriterConcurrentMerge(dir, new StandardAnalyzer(), true); + writer.setMergeFactor(2); + writer.setMaxBufferedDocs(2); + + for (int i = start; i < (start + numDocs); i++) + { + Document temp = new Document(); + temp.add(new Field("count", (""+i), Field.Store.YES, Field.Index.UN_TOKENIZED)); + + writer.addDocument(temp); + } + writer.close(); + } +} Index: test/org/apache/lucene/index/TestIndexWriterConcurrentMergeDelete.java =================================================================== --- test/org/apache/lucene/index/TestIndexWriterConcurrentMergeDelete.java (revision 0) +++ test/org/apache/lucene/index/TestIndexWriterConcurrentMergeDelete.java (revision 0) @@ -0,0 +1,539 @@ +package org.apache.lucene.index; + +import java.io.IOException; +import java.util.Arrays; + +import junit.framework.TestCase; + +import org.apache.lucene.analysis.WhitespaceAnalyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.search.Hits; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.MockRAMDirectory; +import org.apache.lucene.store.RAMDirectory; + +public class TestIndexWriterConcurrentMergeDelete extends TestCase { + private static final long SLEEP_MILLI_SECONDS = 200; + + // test the simple case + public void testSimpleCase() throws IOException { + String[] keywords = { "1", "2" }; + String[] unindexed = { "Netherlands", "Italy" }; + String[] unstored = { "Amsterdam has lots of bridges", + "Venice has lots of canals" }; + String[] text = { "Amsterdam", "Venice" }; + + for(int pass=0;pass<2;pass++) { + boolean autoCommit = (0==pass); + + Directory dir = new RAMDirectory(); + IndexWriterConcurrentMerge modifier = new IndexWriterConcurrentMerge(dir, autoCommit, + new WhitespaceAnalyzer(), true); + modifier.setUseCompoundFile(true); + modifier.setMaxBufferedDeleteTerms(1); + + for (int i = 0; i < keywords.length; i++) { + Document doc = new Document(); + doc.add(new Field("id", keywords[i], Field.Store.YES, + Field.Index.UN_TOKENIZED)); + doc.add(new Field("country", unindexed[i], Field.Store.YES, + Field.Index.NO)); + doc.add(new Field("contents", unstored[i], Field.Store.NO, + Field.Index.TOKENIZED)); + doc + .add(new Field("city", text[i], Field.Store.YES, + Field.Index.TOKENIZED)); + modifier.addDocument(doc); + } + modifier.optimize(); + + if (!autoCommit) { + modifier.close(); + } + + Term term = new Term("city", "Amsterdam"); + int hitCount = getHitCount(dir, term); + assertEquals(1, hitCount); + if (!autoCommit) { + modifier = new IndexWriterConcurrentMerge(dir, autoCommit, new WhitespaceAnalyzer()); + modifier.setUseCompoundFile(true); + } + modifier.deleteDocuments(term); + if (!autoCommit) { + modifier.close(); + } else { + try { Thread.sleep(SLEEP_MILLI_SECONDS); } catch (InterruptedException e) {} + } + hitCount = getHitCount(dir, term); + assertEquals(0, hitCount); + + if (autoCommit) { + modifier.close(); + } + dir.close(); + } + } + + // test when delete terms only apply to disk segments + public void testNonRAMDelete() throws IOException { + for(int pass=0;pass<2;pass++) { + boolean autoCommit = (0==pass); + + Directory dir = new RAMDirectory(); + IndexWriterConcurrentMerge modifier = new IndexWriterConcurrentMerge(dir, autoCommit, + new WhitespaceAnalyzer(), true); + modifier.setMaxBufferedDocs(2); + modifier.setMaxBufferedDeleteTerms(2); + + int id = 0; + int value = 100; + + for (int i = 0; i < 7; i++) { + addDoc(modifier, ++id, value); + } + modifier.flush(); + + assertEquals(0, modifier.getRamSegmentCount()); + assertTrue(0 < modifier.getSegmentCount()); + + if (!autoCommit) { + modifier.close(); + } + + IndexReader reader = IndexReader.open(dir); + assertEquals(7, reader.numDocs()); + reader.close(); + + if (!autoCommit) { + modifier = new IndexWriterConcurrentMerge(dir, autoCommit, new WhitespaceAnalyzer()); + modifier.setMaxBufferedDocs(2); + modifier.setMaxBufferedDeleteTerms(2); + } + + modifier.deleteDocuments(new Term("value", String.valueOf(value))); + modifier.deleteDocuments(new Term("value", String.valueOf(value))); + + if (!autoCommit) { + modifier.close(); + } else { + try { Thread.sleep(SLEEP_MILLI_SECONDS); } catch (InterruptedException e) {} + } + + reader = IndexReader.open(dir); + assertEquals(0, reader.numDocs()); + reader.close(); + if (autoCommit) { + modifier.close(); + } + dir.close(); + } + } + + // test when delete terms only apply to ram segments + public void testRAMDeletes() throws IOException { + for(int pass=0;pass<2;pass++) { + boolean autoCommit = (0==pass); + Directory dir = new RAMDirectory(); + IndexWriterConcurrentMerge modifier = new IndexWriterConcurrentMerge(dir, autoCommit, + new WhitespaceAnalyzer(), true); + modifier.setMaxBufferedDocs(4); + modifier.setMaxBufferedDeleteTerms(4); + + int id = 0; + int value = 100; + + addDoc(modifier, ++id, value); + modifier.deleteDocuments(new Term("value", String.valueOf(value))); + addDoc(modifier, ++id, value); + modifier.deleteDocuments(new Term("value", String.valueOf(value))); + + assertEquals(2, modifier.getNumBufferedDeleteTerms()); + assertEquals(1, modifier.getBufferedDeleteTermsSize()); + + addDoc(modifier, ++id, value); + assertEquals(0, modifier.getSegmentCount()); + modifier.flush(); + + if (!autoCommit) { + modifier.close(); + } + + IndexReader reader = IndexReader.open(dir); + assertEquals(1, reader.numDocs()); + + int hitCount = getHitCount(dir, new Term("id", String.valueOf(id))); + assertEquals(1, hitCount); + reader.close(); + if (autoCommit) { + modifier.close(); + } + dir.close(); + } + } + + // test when delete terms apply to both disk and ram segments + public void testBothDeletes() throws IOException { + for(int pass=0;pass<2;pass++) { + boolean autoCommit = (0==pass); + + Directory dir = new RAMDirectory(); + IndexWriterConcurrentMerge modifier = new IndexWriterConcurrentMerge(dir, autoCommit, + new WhitespaceAnalyzer(), true); + modifier.setMaxBufferedDocs(100); + modifier.setMaxBufferedDeleteTerms(100); + + int id = 0; + int value = 100; + + for (int i = 0; i < 5; i++) { + addDoc(modifier, ++id, value); + } + + value = 200; + for (int i = 0; i < 5; i++) { + addDoc(modifier, ++id, value); + } + modifier.flush(); + + for (int i = 0; i < 5; i++) { + addDoc(modifier, ++id, value); + } + modifier.deleteDocuments(new Term("value", String.valueOf(value))); + + modifier.flush(); + if (!autoCommit) { + modifier.close(); + } + + IndexReader reader = IndexReader.open(dir); + assertEquals(5, reader.numDocs()); + if (autoCommit) { + modifier.close(); + } + } + } + + // test that batched delete terms are flushed together + public void testBatchDeletes() throws IOException { + for(int pass=0;pass<2;pass++) { + boolean autoCommit = (0==pass); + Directory dir = new RAMDirectory(); + IndexWriterConcurrentMerge modifier = new IndexWriterConcurrentMerge(dir, autoCommit, + new WhitespaceAnalyzer(), true); + modifier.setMaxBufferedDocs(2); + modifier.setMaxBufferedDeleteTerms(2); + + int id = 0; + int value = 100; + + for (int i = 0; i < 7; i++) { + addDoc(modifier, ++id, value); + } + modifier.flush(); + if (!autoCommit) { + modifier.close(); + } + + IndexReader reader = IndexReader.open(dir); + assertEquals(7, reader.numDocs()); + reader.close(); + + if (!autoCommit) { + modifier = new IndexWriterConcurrentMerge(dir, autoCommit, + new WhitespaceAnalyzer()); + modifier.setMaxBufferedDocs(2); + modifier.setMaxBufferedDeleteTerms(2); + } + + id = 0; + modifier.deleteDocuments(new Term("id", String.valueOf(++id))); + modifier.deleteDocuments(new Term("id", String.valueOf(++id))); + + if (!autoCommit) { + modifier.close(); + } else { + try { Thread.sleep(SLEEP_MILLI_SECONDS); } catch (InterruptedException e) {} + } + + reader = IndexReader.open(dir); + assertEquals(5, reader.numDocs()); + reader.close(); + + Term[] terms = new Term[3]; + for (int i = 0; i < terms.length; i++) { + terms[i] = new Term("id", String.valueOf(++id)); + } + if (!autoCommit) { + modifier = new IndexWriterConcurrentMerge(dir, autoCommit, + new WhitespaceAnalyzer()); + modifier.setMaxBufferedDocs(2); + modifier.setMaxBufferedDeleteTerms(2); + } + modifier.deleteDocuments(terms); + if (!autoCommit) { + modifier.close(); + } else { + try { Thread.sleep(SLEEP_MILLI_SECONDS); } catch (InterruptedException e) {} + } + reader = IndexReader.open(dir); + assertEquals(2, reader.numDocs()); + reader.close(); + + if (autoCommit) { + modifier.close(); + } + dir.close(); + } + } + + private void addDoc(IndexWriterConcurrentMerge modifier, int id, int value) + throws IOException { + Document doc = new Document(); + doc.add(new Field("content", "aaa", Field.Store.NO, Field.Index.TOKENIZED)); + doc.add(new Field("id", String.valueOf(id), Field.Store.YES, + Field.Index.UN_TOKENIZED)); + doc.add(new Field("value", String.valueOf(value), Field.Store.NO, + Field.Index.UN_TOKENIZED)); + modifier.addDocument(doc); + } + + private int getHitCount(Directory dir, Term term) throws IOException { + IndexSearcher searcher = new IndexSearcher(dir); + int hitCount = searcher.search(new TermQuery(term)).length(); + searcher.close(); + return hitCount; + } + + public void testDeletesOnDiskFull() throws IOException { + testOperationsOnDiskFull(false); + } + + public void testUpdatesOnDiskFull() throws IOException { + testOperationsOnDiskFull(true); + } + + /** + * Make sure if modifier tries to commit but hits disk full that modifier + * remains consistent and usable. Similar to TestIndexReader.testDiskFull(). + */ + private void testOperationsOnDiskFull(boolean updates) throws IOException { + + boolean debug = false; + Term searchTerm = new Term("content", "aaa"); + int START_COUNT = 157; + int END_COUNT = 144; + + for(int pass=0;pass<2;pass++) { + boolean autoCommit = (0==pass); + + // First build up a starting index: + RAMDirectory startDir = new RAMDirectory(); + IndexWriterConcurrentMerge writer = new IndexWriterConcurrentMerge(startDir, autoCommit, + new WhitespaceAnalyzer(), true); + for (int i = 0; i < 157; i++) { + Document d = new Document(); + d.add(new Field("id", Integer.toString(i), Field.Store.YES, + Field.Index.UN_TOKENIZED)); + d.add(new Field("content", "aaa " + i, Field.Store.NO, + Field.Index.TOKENIZED)); + writer.addDocument(d); + } + writer.close(); + + long diskUsage = startDir.sizeInBytes(); + long diskFree = diskUsage + 10; + + IOException err = null; + + boolean done = false; + + // Iterate w/ ever increasing free disk space: + while (!done) { + MockRAMDirectory dir = new MockRAMDirectory(startDir); + IndexWriterConcurrentMerge modifier = new IndexWriterConcurrentMerge(dir, autoCommit, + new WhitespaceAnalyzer()); + + modifier.setMaxBufferedDocs(1000); // use flush or close + modifier.setMaxBufferedDeleteTerms(1000); // use flush or close + + // For each disk size, first try to commit against + // dir that will hit random IOExceptions & disk + // full; after, give it infinite disk space & turn + // off random IOExceptions & retry w/ same reader: + boolean success = false; + + for (int x = 0; x < 2; x++) { + + double rate = 0.1; + double diskRatio = ((double)diskFree) / diskUsage; + long thisDiskFree; + String testName; + + if (0 == x) { + thisDiskFree = diskFree; + if (diskRatio >= 2.0) { + rate /= 2; + } + if (diskRatio >= 4.0) { + rate /= 2; + } + if (diskRatio >= 6.0) { + rate = 0.0; + } + if (debug) { + System.out.println("\ncycle: " + diskFree + " bytes"); + } + testName = "disk full during reader.close() @ " + thisDiskFree + + " bytes"; + } else { + thisDiskFree = 0; + rate = 0.0; + if (debug) { + System.out.println("\ncycle: same writer: unlimited disk space"); + } + testName = "reader re-use after disk full"; + } + + dir.setMaxSizeInBytes(thisDiskFree); + dir.setRandomIOExceptionRate(rate, diskFree); + + try { + if (0 == x) { + int docId = 12; + for (int i = 0; i < 13; i++) { + if (updates) { + Document d = new Document(); + d.add(new Field("id", Integer.toString(i), Field.Store.YES, + Field.Index.UN_TOKENIZED)); + d.add(new Field("content", "bbb " + i, Field.Store.NO, + Field.Index.TOKENIZED)); + modifier.updateDocument(new Term("id", Integer.toString(docId)), d); + } else { // deletes + modifier.deleteDocuments(new Term("id", Integer.toString(docId))); + // modifier.setNorm(docId, "contents", (float)2.0); + } + docId += 12; + } + } + modifier.close(); + success = true; + if (0 == x) { + done = true; + } + } + catch (IOException e) { + if (debug) { + System.out.println(" hit IOException: " + e); + } + err = e; + if (1 == x) { + e.printStackTrace(); + fail(testName + " hit IOException after disk space was freed up"); + } + } + + // Whether we succeeded or failed, check that all + // un-referenced files were in fact deleted (ie, + // we did not create garbage). Just create a + // new IndexFileDeleter, have it delete + // unreferenced files, then verify that in fact + // no files were deleted: + String[] startFiles = dir.list(); + SegmentInfos infos = new SegmentInfos(); + infos.read(dir); + IndexFileDeleter d = new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null); + String[] endFiles = dir.list(); + + Arrays.sort(startFiles); + Arrays.sort(endFiles); + + // for(int i=0;i 0) { + s += "\n "; + } + s += l[i]; + } + return s; + } +} Index: test/org/apache/lucene/index/TestIndexWriterConcurrentMergeMergePolicy.java =================================================================== --- test/org/apache/lucene/index/TestIndexWriterConcurrentMergeMergePolicy.java (revision 0) +++ test/org/apache/lucene/index/TestIndexWriterConcurrentMergeMergePolicy.java (revision 0) @@ -0,0 +1,233 @@ +package org.apache.lucene.index; + +import java.io.IOException; + +import org.apache.lucene.analysis.WhitespaceAnalyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.RAMDirectory; + +import junit.framework.TestCase; + +public class TestIndexWriterConcurrentMergeMergePolicy extends TestCase { + private static final long SLEEP_MILLI_SECONDS = 200; + + // Test the normal case + public void testNormalCase() throws IOException { + Directory dir = new RAMDirectory(); + + IndexWriterConcurrentMerge writer = new IndexWriterConcurrentMerge(dir, new WhitespaceAnalyzer(), true); + writer.setMaxBufferedDocs(10); + writer.setMergeFactor(10); + + for (int i = 0; i < 100; i++) { + addDoc(writer); + checkInvariants(writer); + } + + writer.close(); + } + + // Test to see if there is over merge + public void testNoOverMerge() throws IOException { + Directory dir = new RAMDirectory(); + + IndexWriterConcurrentMerge writer = new IndexWriterConcurrentMerge(dir, new WhitespaceAnalyzer(), true); + writer.setMaxBufferedDocs(10); + writer.setMergeFactor(10); + + boolean noOverMerge = false; + for (int i = 0; i < 100; i++) { + addDoc(writer); + checkInvariants(writer); + if (writer.getRamSegmentCount() + writer.getSegmentCount() >= 18) { + noOverMerge = true; + } + } + assertTrue(noOverMerge); + + writer.close(); + } + + // Test the case where flush is forced after every addDoc + public void testForceFlush() throws IOException { + Directory dir = new RAMDirectory(); + + IndexWriterConcurrentMerge writer = new IndexWriterConcurrentMerge(dir, new WhitespaceAnalyzer(), true); + writer.setMaxBufferedDocs(10); + writer.setMergeFactor(10); + + for (int i = 0; i < 100; i++) { + addDoc(writer); + writer.close(); + + writer = new IndexWriterConcurrentMerge(dir, new WhitespaceAnalyzer(), false); + writer.setMaxBufferedDocs(10); + writer.setMergeFactor(10); + checkInvariants(writer); + } + + writer.close(); + } + + // Test the case where mergeFactor changes + public void testMergeFactorChange() throws IOException { + Directory dir = new RAMDirectory(); + + IndexWriterConcurrentMerge writer = new IndexWriterConcurrentMerge(dir, new WhitespaceAnalyzer(), true); + writer.setMaxBufferedDocs(10); + writer.setMergeFactor(100); + + for (int i = 0; i < 250; i++) { + addDoc(writer); + checkInvariants(writer); + } + + writer.setMergeFactor(5); + + // merge policy only fixes segments on levels where merges + // have been triggered, so check invariants after all adds + for (int i = 0; i < 10; i++) { + addDoc(writer); + } + checkInvariants(writer); + + writer.close(); + } + + // Test the case where both mergeFactor and maxBufferedDocs change + public void testMaxBufferedDocsChange() throws IOException { + Directory dir = new RAMDirectory(); + + IndexWriterConcurrentMerge writer = new IndexWriterConcurrentMerge(dir, new WhitespaceAnalyzer(), true); + writer.setMaxBufferedDocs(101); + writer.setMergeFactor(101); + + // leftmost* segment has 1 doc + // rightmost* segment has 100 docs + for (int i = 1; i <= 100; i++) { + for (int j = 0; j < i; j++) { + addDoc(writer); + } + checkInvariants(writer); + writer.close(); + + writer = new IndexWriterConcurrentMerge(dir, new WhitespaceAnalyzer(), false); + writer.setMaxBufferedDocs(101); + writer.setMergeFactor(101); + } + + writer.setMaxBufferedDocs(10); + writer.setMergeFactor(10); + + // merge policy only fixes segments on levels where merges + // have been triggered, so check invariants after all adds + for (int i = 0; i < 100; i++) { + addDoc(writer); + } + try { Thread.sleep(SLEEP_MILLI_SECONDS); } catch (InterruptedException e) {} + checkInvariants(writer); + + for (int i = 100; i < 1000; i++) { + addDoc(writer); + } + checkInvariants(writer); + + writer.close(); + } + + // Test the case where a merge results in no doc at all + public void testMergeDocCount0() throws IOException { + Directory dir = new RAMDirectory(); + + IndexWriterConcurrentMerge writer = new IndexWriterConcurrentMerge(dir, new WhitespaceAnalyzer(), true); + writer.setMaxBufferedDocs(10); + writer.setMergeFactor(100); + + for (int i = 0; i < 250; i++) { + addDoc(writer); + checkInvariants(writer); + } + writer.close(); + + IndexReader reader = IndexReader.open(dir); + reader.deleteDocuments(new Term("content", "aaa")); + reader.close(); + + writer = new IndexWriterConcurrentMerge(dir, new WhitespaceAnalyzer(), false); + writer.setMaxBufferedDocs(10); + writer.setMergeFactor(5); + + // merge factor is changed, so check invariants after all adds + for (int i = 0; i < 10; i++) { + addDoc(writer); + } + checkInvariants(writer); + assertEquals(10, writer.docCount()); + + writer.close(); + } + + private void addDoc(IndexWriterConcurrentMerge writer) throws IOException { + Document doc = new Document(); + doc.add(new Field("content", "aaa", Field.Store.NO, Field.Index.TOKENIZED)); + writer.addDocument(doc); + } + + private void checkInvariants(IndexWriterConcurrentMerge writer) throws IOException { + try { Thread.sleep(SLEEP_MILLI_SECONDS); } catch (InterruptedException e) {} + + int maxBufferedDocs = writer.getMaxBufferedDocs(); + int mergeFactor = writer.getMergeFactor(); + int maxMergeDocs = writer.getMaxMergeDocs(); + + int ramSegmentCount = writer.getRamSegmentCount(); + assertTrue(ramSegmentCount < maxBufferedDocs); + + int lowerBound = -1; + int upperBound = maxBufferedDocs; + int numSegments = 0; + + int segmentCount = writer.getSegmentCount(); + for (int i = segmentCount - 1; i >= 0; i--) { + int docCount = writer.getDocCount(i); + assertTrue(docCount > lowerBound); + + if (docCount <= upperBound) { + numSegments++; + } else { + if (upperBound * mergeFactor <= maxMergeDocs) { + assertTrue(numSegments < mergeFactor); + } + + do { + lowerBound = upperBound; + upperBound *= mergeFactor; + } while (docCount > upperBound); + numSegments = 1; + } + } + if (upperBound * mergeFactor <= maxMergeDocs) { + assertTrue(numSegments < mergeFactor); + } + + String[] files = writer.getDirectory().list(); + int segmentCfsCount = 0; + for (int i = 0; i < files.length; i++) { + if (files[i].endsWith(".cfs")) { + segmentCfsCount++; + } + } + assertEquals(segmentCount, segmentCfsCount); + } + + private void printSegmentDocCounts(IndexWriterConcurrentMerge writer) { + int segmentCount = writer.getSegmentCount(); + System.out.println("" + segmentCount + " segments total"); + for (int i = 0; i < segmentCount; i++) { + System.out.println(" segment " + i + " has " + writer.getDocCount(i) + + " docs"); + } + } +} Index: test/org/apache/lucene/index/TestIndexWriterConcurrentMerge.java =================================================================== --- test/org/apache/lucene/index/TestIndexWriterConcurrentMerge.java (revision 0) +++ test/org/apache/lucene/index/TestIndexWriterConcurrentMerge.java (revision 0) @@ -0,0 +1,1054 @@ +package org.apache.lucene.index; + +import java.io.IOException; +import java.io.File; +import java.util.Arrays; + +import junit.framework.TestCase; + +import org.apache.lucene.analysis.WhitespaceAnalyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriterConcurrentMerge; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Hits; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.RAMDirectory; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.AlreadyClosedException; + +import org.apache.lucene.store.MockRAMDirectory; +import org.apache.lucene.store.LockFactory; +import org.apache.lucene.store.Lock; +import org.apache.lucene.store.SingleInstanceLockFactory; + +public class TestIndexWriterConcurrentMerge extends TestCase +{ + public void testDocCount() throws IOException + { + Directory dir = new RAMDirectory(); + + IndexWriterConcurrentMerge writer = null; + IndexReader reader = null; + int i; + + IndexWriterConcurrentMerge.setDefaultWriteLockTimeout(2000); + assertEquals(2000, IndexWriterConcurrentMerge.getDefaultWriteLockTimeout()); + + writer = new IndexWriterConcurrentMerge(dir, new WhitespaceAnalyzer()); + + IndexWriterConcurrentMerge.setDefaultWriteLockTimeout(1000); + + // add 100 documents + for (i = 0; i < 100; i++) { + addDoc(writer); + } + assertEquals(100, writer.docCount()); + writer.close(); + + // delete 40 documents + reader = IndexReader.open(dir); + for (i = 0; i < 40; i++) { + reader.deleteDocument(i); + } + reader.close(); + + // test doc count before segments are merged/index is optimized + writer = new IndexWriterConcurrentMerge(dir, new WhitespaceAnalyzer()); + assertEquals(100, writer.docCount()); + writer.close(); + + reader = IndexReader.open(dir); + assertEquals(100, reader.maxDoc()); + assertEquals(60, reader.numDocs()); + reader.close(); + + // optimize the index and check that the new doc count is correct + writer = new IndexWriterConcurrentMerge(dir, true, new WhitespaceAnalyzer()); + writer.optimize(); + assertEquals(60, writer.docCount()); + writer.close(); + + // check that the index reader gives the same numbers. + reader = IndexReader.open(dir); + assertEquals(60, reader.maxDoc()); + assertEquals(60, reader.numDocs()); + reader.close(); + + // make sure opening a new index for create over + // this existing one works correctly: + writer = new IndexWriterConcurrentMerge(dir, new WhitespaceAnalyzer(), true); + assertEquals(0, writer.docCount()); + writer.close(); + } + + private void addDoc(IndexWriterConcurrentMerge writer) throws IOException + { + Document doc = new Document(); + doc.add(new Field("content", "aaa", Field.Store.NO, Field.Index.TOKENIZED)); + writer.addDocument(doc); + } + + private void addDocWithIndex(IndexWriterConcurrentMerge writer, int index) throws IOException + { + Document doc = new Document(); + doc.add(new Field("content", "aaa " + index, Field.Store.YES, Field.Index.TOKENIZED)); + doc.add(new Field("id", "" + index, Field.Store.YES, Field.Index.TOKENIZED)); + writer.addDocument(doc); + } + + /* + Test: make sure when we run out of disk space or hit + random IOExceptions in any of the addIndexes(*) calls + that 1) index is not corrupt (searcher can open/search + it) and 2) transactional semantics are followed: + either all or none of the incoming documents were in + fact added. + */ + public void testAddIndexOnDiskFull() throws IOException + { + + int START_COUNT = 57; + int NUM_DIR = 50; + int END_COUNT = START_COUNT + NUM_DIR*25; + + boolean debug = false; + + // Build up a bunch of dirs that have indexes which we + // will then merge together by calling addIndexes(*): + Directory[] dirs = new Directory[NUM_DIR]; + long inputDiskUsage = 0; + for(int i=0;i= 2.0) { + rate /= 2; + } + if (diskRatio >= 4.0) { + rate /= 2; + } + if (diskRatio >= 6.0) { + rate = 0.0; + } + if (debug) + testName = "disk full test " + methodName + " with disk full at " + diskFree + " bytes autoCommit=" + autoCommit; + } else { + thisDiskFree = 0; + rate = 0.0; + if (debug) + testName = "disk full test " + methodName + " with unlimited disk space autoCommit=" + autoCommit; + } + + if (debug) + System.out.println("\ncycle: " + testName); + + dir.setMaxSizeInBytes(thisDiskFree); + dir.setRandomIOExceptionRate(rate, diskFree); + + try { + + if (0 == method) { + writer.addIndexes(dirs); + } else if (1 == method) { + IndexReader readers[] = new IndexReader[dirs.length]; + for(int i=0;i 0) { + s += "\n "; + } + s += l[i]; + } + return s; + } + + // Make sure we can open an index for create even when a + // reader holds it open (this fails pre lock-less + // commits on windows): + public void testCreateWithReader() throws IOException { + String tempDir = System.getProperty("java.io.tmpdir"); + if (tempDir == null) + throw new IOException("java.io.tmpdir undefined, cannot run test"); + File indexDir = new File(tempDir, "lucenetestindexwriter"); + + try { + Directory dir = FSDirectory.getDirectory(indexDir); + + // add one document & close writer + IndexWriterConcurrentMerge writer = new IndexWriterConcurrentMerge(dir, new WhitespaceAnalyzer(), true); + addDoc(writer); + writer.close(); + + // now open reader: + IndexReader reader = IndexReader.open(dir); + assertEquals("should be one document", reader.numDocs(), 1); + + // now open index for create: + writer = new IndexWriterConcurrentMerge(dir, new WhitespaceAnalyzer(), true); + assertEquals("should be zero documents", writer.docCount(), 0); + addDoc(writer); + writer.close(); + + assertEquals("should be one document", reader.numDocs(), 1); + IndexReader reader2 = IndexReader.open(dir); + assertEquals("should be one document", reader2.numDocs(), 1); + reader.close(); + reader2.close(); + } finally { + rmDir(indexDir); + } + } + + + // Same test as above, but use IndexWriterConcurrentMerge constructor + // that takes File: + public void testCreateWithReader2() throws IOException { + String tempDir = System.getProperty("java.io.tmpdir"); + if (tempDir == null) + throw new IOException("java.io.tmpdir undefined, cannot run test"); + File indexDir = new File(tempDir, "lucenetestindexwriter"); + try { + // add one document & close writer + IndexWriterConcurrentMerge writer = new IndexWriterConcurrentMerge(indexDir, new WhitespaceAnalyzer(), true); + addDoc(writer); + writer.close(); + + // now open reader: + IndexReader reader = IndexReader.open(indexDir); + assertEquals("should be one document", reader.numDocs(), 1); + + // now open index for create: + writer = new IndexWriterConcurrentMerge(indexDir, new WhitespaceAnalyzer(), true); + assertEquals("should be zero documents", writer.docCount(), 0); + addDoc(writer); + writer.close(); + + assertEquals("should be one document", reader.numDocs(), 1); + IndexReader reader2 = IndexReader.open(indexDir); + assertEquals("should be one document", reader2.numDocs(), 1); + reader.close(); + reader2.close(); + } finally { + rmDir(indexDir); + } + } + + // Same test as above, but use IndexWriterConcurrentMerge constructor + // that takes String: + public void testCreateWithReader3() throws IOException { + String tempDir = System.getProperty("tempDir"); + if (tempDir == null) + throw new IOException("java.io.tmpdir undefined, cannot run test"); + + String dirName = tempDir + "/lucenetestindexwriter"; + try { + + // add one document & close writer + IndexWriterConcurrentMerge writer = new IndexWriterConcurrentMerge(dirName, new WhitespaceAnalyzer(), true); + addDoc(writer); + writer.close(); + + // now open reader: + IndexReader reader = IndexReader.open(dirName); + assertEquals("should be one document", reader.numDocs(), 1); + + // now open index for create: + writer = new IndexWriterConcurrentMerge(dirName, new WhitespaceAnalyzer(), true); + assertEquals("should be zero documents", writer.docCount(), 0); + addDoc(writer); + writer.close(); + + assertEquals("should be one document", reader.numDocs(), 1); + IndexReader reader2 = IndexReader.open(dirName); + assertEquals("should be one document", reader2.numDocs(), 1); + reader.close(); + reader2.close(); + } finally { + rmDir(new File(dirName)); + } + } + + // Simulate a writer that crashed while writing segments + // file: make sure we can still open the index (ie, + // gracefully fallback to the previous segments file), + // and that we can add to the index: + public void testSimulatedCrashedWriter() throws IOException { + Directory dir = new RAMDirectory(); + + IndexWriterConcurrentMerge writer = null; + + writer = new IndexWriterConcurrentMerge(dir, new WhitespaceAnalyzer(), true); + + // add 100 documents + for (int i = 0; i < 100; i++) { + addDoc(writer); + } + + // close + writer.close(); + + long gen = SegmentInfos.getCurrentSegmentGeneration(dir); + assertTrue("segment generation should be > 1 but got " + gen, gen > 1); + + // Make the next segments file, with last byte + // missing, to simulate a writer that crashed while + // writing segments file: + String fileNameIn = SegmentInfos.getCurrentSegmentFileName(dir); + String fileNameOut = IndexFileNames.fileNameFromGeneration(IndexFileNames.SEGMENTS, + "", + 1+gen); + IndexInput in = dir.openInput(fileNameIn); + IndexOutput out = dir.createOutput(fileNameOut); + long length = in.length(); + for(int i=0;i 1 but got " + gen, gen > 1); + + String fileNameIn = SegmentInfos.getCurrentSegmentFileName(dir); + String fileNameOut = IndexFileNames.fileNameFromGeneration(IndexFileNames.SEGMENTS, + "", + 1+gen); + IndexInput in = dir.openInput(fileNameIn); + IndexOutput out = dir.createOutput(fileNameOut); + long length = in.length(); + for(int i=0;i 1 but got " + gen, gen > 1); + + String[] files = dir.list(); + for(int i=0;i= maxMergeDocs) { + lowerBound = STARTING_BOUND; + upperBound = STARTING_BOUND; + mergedAtCurrLevel = false; + } + + if (upperBound == STARTING_BOUND) { + if (ramSegmentInfos.size() >= maxBufferedDocs + || numBufferedDeleteTerms >= maxBufferedDeleteTerms) { + lowerBound = STARTING_BOUND; + upperBound = maxBufferedDocs; + mergedAtCurrLevel = false; + + List segments = new ArrayList(); + segments.addAll(ramSegmentInfos); + spec = new MergeSpecification(segments, true); + } + } else { + + int minSegment = segmentInfos.size(); + int maxSegment = -1; + + // find merge-worthy segments + while (--minSegment >= 0) { + SegmentInfo si = segmentInfos.info(minSegment); + + if (maxSegment == -1 && si.docCount > lowerBound + && si.docCount <= upperBound) { + // start from the rightmost* segment whose doc count is in bounds + maxSegment = minSegment; + } else if (si.docCount > upperBound) { + // until the segment whose doc count exceeds upperBound + break; + } + } + + minSegment++; + maxSegment++; + int numSegments = maxSegment - minSegment; + + if (numSegments < mergeFactor) { + if (!mergedAtCurrLevel) { + lowerBound = STARTING_BOUND; + upperBound = STARTING_BOUND; + mergedAtCurrLevel = false; + } else { + lowerBound = upperBound; + upperBound *= mergeFactor; + mergedAtCurrLevel = false; + spec = select(segmentInfos, ramSegmentInfos, numBufferedDeleteTerms); + } + } else { + // number of merge-worthy segments may exceed mergeFactor when + // mergeFactor and/or maxBufferedDocs change(s) + if (numSegments == mergeFactor) { + lowerBound = upperBound; + upperBound *= mergeFactor; + mergedAtCurrLevel = false; + } else { + mergedAtCurrLevel = true; + } + + List segments = new ArrayList(); + for (int i = minSegment; i < minSegment + mergeFactor; i++) { + segments.add(segmentInfos.get(i)); + } + spec = new MergeSpecification(segments, false); + } + } + return spec; + } +} Index: java/org/apache/lucene/index/IndexWriterConcurrentMerge.java =================================================================== --- java/org/apache/lucene/index/IndexWriterConcurrentMerge.java (revision 0) +++ java/org/apache/lucene/index/IndexWriterConcurrentMerge.java (revision 0) @@ -0,0 +1,387 @@ +package org.apache.lucene.index; + +import java.io.File; +import java.io.IOException; +import java.util.List; + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.LockObtainFailedException; + +public class IndexWriterConcurrentMerge extends IndexWriter { + public static final long DEFAULT_SLEEP_MILLI_SECONDS = 100; + private long sleepMilliSeconds = DEFAULT_SLEEP_MILLI_SECONDS; + + private MergeSelector selector; + private Merger merger; + private boolean mergeThreadRunning; + private boolean mergeInProgress; + private MergeThread mergeThread; + private IOException mergeException = null; + + public IndexWriterConcurrentMerge(String path, Analyzer a, boolean create) + throws CorruptIndexException, LockObtainFailedException, IOException { + super(path, a, create); + initMergeThread(); + } + + public IndexWriterConcurrentMerge(File path, Analyzer a, boolean create) + throws CorruptIndexException, LockObtainFailedException, IOException { + super(path, a, create); + initMergeThread(); + } + + public IndexWriterConcurrentMerge(Directory d, Analyzer a, boolean create) + throws CorruptIndexException, LockObtainFailedException, IOException { + super(d, a, create); + initMergeThread(); + } + + public IndexWriterConcurrentMerge(String path, Analyzer a) + throws CorruptIndexException, LockObtainFailedException, IOException { + super(path, a); + initMergeThread(); + } + + public IndexWriterConcurrentMerge(File path, Analyzer a) + throws CorruptIndexException, LockObtainFailedException, IOException { + super(path, a); + initMergeThread(); + } + + public IndexWriterConcurrentMerge(Directory d, Analyzer a) + throws CorruptIndexException, LockObtainFailedException, IOException { + super(d, a); + initMergeThread(); + } + + public IndexWriterConcurrentMerge(Directory d, boolean autoCommit, Analyzer a) + throws CorruptIndexException, LockObtainFailedException, IOException { + super(d, autoCommit, a); + initMergeThread(); + } + + public IndexWriterConcurrentMerge(Directory d, boolean autoCommit, + Analyzer a, boolean create) throws CorruptIndexException, + LockObtainFailedException, IOException { + super(d, autoCommit, a, create); + initMergeThread(); + } + + public IndexWriterConcurrentMerge(Directory d, boolean autoCommit, + Analyzer a, IndexDeletionPolicy deletionPolicy) + throws CorruptIndexException, LockObtainFailedException, IOException { + super(d, autoCommit, a, deletionPolicy); + initMergeThread(); + } + + public IndexWriterConcurrentMerge(Directory d, boolean autoCommit, + Analyzer a, boolean create, IndexDeletionPolicy deletionPolicy) + throws CorruptIndexException, LockObtainFailedException, IOException { + super(d, autoCommit, a, create, deletionPolicy); + initMergeThread(); + } + + private void initMergeThread() { + selector = new MergeSelector(this); + merger = new Merger(this); + mergeThread = new MergeThread(); + mergeThreadRunning = true; + mergeInProgress = false; + mergeThread.start(); + } + + public void setSleepMilliSeconds(long sleepMilliSeconds) { + this.sleepMilliSeconds = sleepMilliSeconds; + } + + public long getSleepMilliSeconds() { + return sleepMilliSeconds; + } + + public void close() throws IOException { + synchronized (this) { + if (closed) + return; + mergeThreadRunning = false; + } + + try { + mergeThread.join(); + } + catch (InterruptedException e) { + IOException ie = new IOException( + "Interrupted while waiting for merge thread to stop."); + ie.initCause(e); + throw ie; + } + + synchronized (this) { + super.close(); + checkMergeException(); + } + } + + public void addDocument(Document doc, Analyzer analyzer) + throws CorruptIndexException, IOException { + ensureOpen(); + SegmentInfo newSegmentInfo = buildSingleDocSegment(doc, analyzer); + synchronized (this) { + waitIfBufferFull(); + ramSegmentInfos.addElement(newSegmentInfo); + } + } + + public synchronized void deleteDocuments(Term term) + throws CorruptIndexException, IOException { + ensureOpen(); + waitIfBufferFull(); + bufferDeleteTerm(term); + } + + public synchronized void deleteDocuments(Term[] terms) + throws CorruptIndexException, IOException { + ensureOpen(); + waitIfBufferFull(); + for (int i = 0; i < terms.length; i++) { + bufferDeleteTerm(terms[i]); + } + } + + public void updateDocument(Term term, Document doc, Analyzer analyzer) + throws CorruptIndexException, IOException { + ensureOpen(); + SegmentInfo newSegmentInfo = buildSingleDocSegment(doc, analyzer); + synchronized (this) { + waitIfBufferFull(); + bufferDeleteTerm(term); + ramSegmentInfos.addElement(newSegmentInfo); + } + } + + public synchronized void optimize() throws CorruptIndexException, IOException { + waitIfMerging(); + super.optimize(); + } + + public synchronized void abort() throws IOException { + waitIfMerging(); + super.abort(); + checkMergeException(); + } + + public synchronized void addIndexes(Directory[] dirs) + throws CorruptIndexException, IOException { + waitIfMerging(); + super.addIndexes(dirs); + checkMergeException(); + } + + public synchronized void addIndexesNoOptimize(Directory[] dirs) + throws CorruptIndexException, IOException { + waitIfMerging(); + super.addIndexesNoOptimize(dirs); + checkMergeException(); + } + + public synchronized void addIndexes(IndexReader[] readers) + throws CorruptIndexException, IOException { + waitIfMerging(); + super.addIndexes(readers); + checkMergeException(); + } + + public synchronized void flush() throws CorruptIndexException, IOException { + waitIfMerging(); + super.flush(); + checkMergeException(); + } + + private void waitIfBufferFull() throws IOException { + try { + while (ramSegmentInfos.size() >= getMaxBufferedDocs() + || getNumBufferedDeleteTerms() >= getMaxBufferedDeleteTerms()) { + checkMergeException(); + wait(); + } + checkMergeException(); + } + catch (InterruptedException e) { + IOException ie = new IOException( + "Interrupted while waiting for a merge to finish."); + ie.initCause(e); + throw ie; + } + } + + private void waitIfMerging() throws IOException { + try { + while (mergeInProgress) { + wait(); + } + } + catch (InterruptedException e) { + IOException ie = new IOException( + "Interrupted while waiting for a merge to finish."); + ie.initCause(e); + throw ie; + } + } + + private synchronized void notifyAllAfterMerge() { + mergeInProgress = false; + notifyAll(); + } + + private synchronized MergeSelector.MergeSpecification selectSegmentsToMerge() { + MergeSelector.MergeSpecification spec = selector.select(segmentInfos, + ramSegmentInfos, getNumBufferedDeleteTerms()); + if (spec != null) { + // found a flush/merge + mergeInProgress = true; + } + return spec; + } + + private SegmentInfo mergeSelectedSegments( + MergeSelector.MergeSpecification spec) throws IOException { + return merger.merge(spec, newSegmentName()); + } + + private synchronized void replaceSelectedSegments( + MergeSelector.MergeSpecification spec, SegmentInfo newSegment) + throws IOException { + // We may be called solely because there are deletes + // pending, in which case doMerge is false: + boolean doMerge = spec.doMerge(); + boolean isRamFlush = spec.isRamFlush(); + boolean anyDeletes = (getBufferedDeleteTermsSize() != 0); + + SegmentInfos rollback = null; + boolean success = false; + + // This is try/finally to rollback our internal state + // if we hit exception when doing the merge: + try { + + if (!isRamFlush || anyDeletes) { + // Now save the SegmentInfo instances that + // we are replacing: + rollback = (SegmentInfos)segmentInfos.clone(); + } + + if (doMerge) { + if (isRamFlush) { + segmentInfos.addElement(newSegment); + } else { + List segments = spec.segments(); + for (int i = segments.size() - 1; i > 0; i--) { // remove old infos & + // add new + segmentInfos.remove(segments.get(i)); + } + + int minSegment = segmentInfos.indexOf(segments.get(0)); + segmentInfos.remove(segments.get(0)); + segmentInfos.add(minSegment, newSegment); + } + } + + if (isRamFlush) { + maybeApplyDeletes(doMerge); + doAfterFlush(); + } + + checkpoint(); + + success = true; + + } finally { + + if (success) { + // The non-ram-segments case is already committed + // (above), so all the remains for ram segments case + // is to clear the ram segments: + if (isRamFlush) { + // same as ramSegmentInfos.removeAll(spec.segments()) here + ramSegmentInfos.removeAllElements(); + } + } else { + + // Must rollback so our state matches index: + if (isRamFlush && !anyDeletes) { + // Simple case: newSegment may or may not have + // been added to the end of our segment infos, + // so just check & remove if so: + if (newSegment != null && segmentInfos.size() > 0 + && segmentInfos.info(segmentInfos.size() - 1) == newSegment) { + segmentInfos.remove(segmentInfos.size() - 1); + } + } else if (rollback != null) { + // Rollback the individual SegmentInfo + // instances, but keep original SegmentInfos + // instance (so we don't try to write again the + // same segments_N file -- write once): + segmentInfos.clear(); + segmentInfos.addAll(rollback); + } + + // Delete any partially created and now unreferenced files: + deleter.refresh(); + } + } + + // Delete the RAM segments + if (isRamFlush) { + deleter.deleteDirect(ramDirectory, spec.segments()); + } + + // Give deleter a chance to remove files now. + deleter.checkpoint(segmentInfos, autoCommit); + } + + private synchronized void setMergeException(IOException e) { + mergeException = e; + } + + private synchronized void checkMergeException() throws IOException { + if (mergeException != null) { + throw mergeException; + } + } + + private class MergeThread extends Thread { + public void run() { + while (mergeThreadRunning) { + // synchronized + MergeSelector.MergeSpecification spec = selectSegmentsToMerge(); + + if (spec == null) { + try { + // nothing to flush/merge, sleep for a short while + sleep(sleepMilliSeconds); + } + catch (InterruptedException e) { + // ignore, no need to propogate this exception + } + continue; + } else { + try { + // not synchronized + SegmentInfo newSegment = mergeSelectedSegments(spec); + + // synchronized + replaceSelectedSegments(spec, newSegment); + } + catch (IOException e) { + // abort the flush/merge + setMergeException(e); + continue; + } finally { + notifyAllAfterMerge(); + } + } + } + } + } +} Index: java/org/apache/lucene/index/IndexWriter.java =================================================================== --- java/org/apache/lucene/index/IndexWriter.java (revision 523522) +++ java/org/apache/lucene/index/IndexWriter.java (working copy) @@ -205,12 +205,12 @@ private SegmentInfos localRollbackSegmentInfos; // segmentInfos we will fallback to if the commit fails private boolean localAutoCommit; // saved autoCommit during local transaction - private boolean autoCommit = true; // false if we should commit only on close + boolean autoCommit = true; // false if we should commit only on close SegmentInfos segmentInfos = new SegmentInfos(); // the segments SegmentInfos ramSegmentInfos = new SegmentInfos(); // the segments in ramDirectory - private final RAMDirectory ramDirectory = new RAMDirectory(); // for temp segs - private IndexFileDeleter deleter; + final RAMDirectory ramDirectory = new RAMDirectory(); // for temp segs + IndexFileDeleter deleter; private Lock writeLock; @@ -233,7 +233,7 @@ private boolean useCompoundFile = true; private boolean closeDir; - private boolean closed; + boolean closed; /** * @throws AlreadyClosedException if this IndexWriter is closed @@ -1317,7 +1317,7 @@ * commit the change immediately. Else, we mark * commitPending. */ - private void checkpoint() throws IOException { + void checkpoint() throws IOException { if (autoCommit) { segmentInfos.write(directory); } else { @@ -1697,7 +1697,7 @@ * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ - public final synchronized void flush() throws CorruptIndexException, IOException { + public synchronized void flush() throws CorruptIndexException, IOException { ensureOpen(); flushRamSegments(); } @@ -1936,7 +1936,7 @@ // Called during flush to apply any buffered deletes. If // doMerge is true then a new segment was just created and // flushed from the ram segments. - private final void maybeApplyDeletes(boolean doMerge) throws CorruptIndexException, IOException { + final void maybeApplyDeletes(boolean doMerge) throws CorruptIndexException, IOException { if (bufferedDeleteTerms.size() > 0) { if (infoStream != null) @@ -2042,7 +2042,7 @@ // current number of documents buffered in ram so that the // delete term will be applied to those ram segments as // well as the disk segments. - private void bufferDeleteTerm(Term term) { + void bufferDeleteTerm(Term term) { Num num = (Num) bufferedDeleteTerms.get(term); if (num == null) { bufferedDeleteTerms.put(term, new Num(ramSegmentInfos.size())); Index: java/org/apache/lucene/index/SegmentMerger.java =================================================================== --- java/org/apache/lucene/index/SegmentMerger.java (revision 523522) +++ java/org/apache/lucene/index/SegmentMerger.java (working copy) @@ -67,6 +67,12 @@ termIndexInterval = writer.getTermIndexInterval(); } + SegmentMerger(Directory dir, String name, int interval) { + directory = dir; + segment = name; + termIndexInterval = interval; + } + /** * Add an IndexReader to the collection of readers that are to be merged * @param reader Index: java/org/apache/lucene/index/Merger.java =================================================================== --- java/org/apache/lucene/index/Merger.java (revision 0) +++ java/org/apache/lucene/index/Merger.java (revision 0) @@ -0,0 +1,113 @@ +package org.apache.lucene.index; + +import java.io.IOException; +import java.io.PrintStream; +import java.util.List; + +import org.apache.lucene.store.Directory; + +class Merger { + IndexWriter writer; + + // Merger should not maintain a reference to a writer. + // Parameters such as useCompoundFile should be accessed + // and updated in this class. + Merger(IndexWriter writer) { + this.writer = writer; + } + + SegmentInfo merge(MergeSelector.MergeSpecification spec, String mergedName) + throws IOException { + // We may be called solely because there are deletes + // pending, in which case doMerge is false: + if (!spec.doMerge()) { + return null; + } + + PrintStream infoStream = writer.getInfoStream(); + Directory directory = writer.getDirectory(); + int termIndexInterval = writer.getTermIndexInterval(); + boolean useCompoundFile = writer.getUseCompoundFile(); + + if (infoStream != null) + infoStream.print("merging segments"); + SegmentMerger merger = new SegmentMerger(directory, mergedName, + termIndexInterval); + SegmentInfo newSegment = null; + + // This is try/finally to make sure merger's readers are closed: + try { + List segments = spec.segments(); + for (int i = 0; i < segments.size(); i++) { + SegmentInfo si = (SegmentInfo)segments.get(i); + if (infoStream != null) + infoStream.print(" " + si.name + " (" + si.docCount + " docs)"); + IndexReader reader = SegmentReader.get(si); // no need to set deleter + // (yet) + merger.add(reader); + } + + boolean success = false; + try { + int mergedDocCount = merger.merge(); + + if (infoStream != null) { + infoStream.println(" into " + mergedName + " (" + mergedDocCount + + " docs)"); + } + + newSegment = new SegmentInfo(mergedName, mergedDocCount, directory, + false, true); + success = true; + } finally { + if (!success) { + // delete files for the partially created segment + // TODO should we use IndexFileDeleter? + String[] files = directory.list(); + for (int i = 0; i < files.length; i++) { + if (files[i].startsWith(mergedName)) { + directory.deleteFile(files[i]); + } + } + } + } + + } finally { + // close readers before we attempt to delete now-obsolete segments + merger.closeReaders(); + } + + if (useCompoundFile) { + boolean success = false; + try { + merger.createCompoundFile(mergedName + ".cfs"); + newSegment.setUseCompoundFile(true); + success = true; + } + catch (IOException e) { + // ignore, no need to propogate this exception + // use the non-compound version if failed in creating + // the compound file + } finally { + if (success) { + // delete files which the compound file replaces + // TODO should we use IndexFileDeleter? + List segmentFiles = newSegment.files(); + String[] files = directory.list(); + for (int i = 0; i < files.length; i++) { + if (files[i].startsWith(mergedName) + && !segmentFiles.contains(files[i])) { + directory.deleteFile(files[i]); + } + } + } else { + // delete partially created compound file + // TODO should we use IndexFileDeleter? + directory.deleteFile(mergedName + ".cfs"); + } + } + } + + return newSegment; + } +}