Index: CHANGES.txt =================================================================== --- CHANGES.txt (revision 759556) +++ CHANGES.txt (working copy) @@ -207,6 +207,13 @@ a specific fields to set the score for a document. (Karl Wettin via Mike McCandless) +19. LUCENE-1516: Added "near real-time search" to IndexWriter, via a + new expert getReader() method. This method returns a reader that + searches the full index, including any uncommitted changes in the + current IndexWriter session. This should result in a faster + turnaround than the normal approach of commiting the changes and + then reopening a reader. (Jason Rutherglen via Mike McCandless) + Optimizations 1. LUCENE-1427: Fixed QueryWrapperFilter to not waste time computing Index: src/test/org/apache/lucene/TestSnapshotDeletionPolicy.java =================================================================== --- src/test/org/apache/lucene/TestSnapshotDeletionPolicy.java (revision 759556) +++ src/test/org/apache/lucene/TestSnapshotDeletionPolicy.java (working copy) @@ -61,6 +61,7 @@ MockRAMDirectory dir2 = new MockRAMDirectory(); runTest(dir2); + dir2.close(); } public void testReuseAcrossWriters() throws Exception { Index: src/test/org/apache/lucene/store/MockRAMOutputStream.java =================================================================== --- src/test/org/apache/lucene/store/MockRAMOutputStream.java (revision 759556) +++ src/test/org/apache/lucene/store/MockRAMOutputStream.java (working copy) @@ -29,13 +29,15 @@ public class MockRAMOutputStream extends RAMOutputStream { private MockRAMDirectory dir; private boolean first=true; + private final String name; byte[] singleByte = new byte[1]; /** Construct an empty output buffer. */ - public MockRAMOutputStream(MockRAMDirectory dir, RAMFile f) { + public MockRAMOutputStream(MockRAMDirectory dir, RAMFile f, String name) { super(f); this.dir = dir; + this.name = name; } public void close() throws IOException { Index: src/test/org/apache/lucene/store/MockRAMDirectory.java =================================================================== --- src/test/org/apache/lucene/store/MockRAMDirectory.java (revision 759556) +++ src/test/org/apache/lucene/store/MockRAMDirectory.java (working copy) @@ -212,7 +212,7 @@ if (preventDoubleWrite && createdFiles.contains(name) && !name.equals("segments.gen")) throw new IOException("file \"" + name + "\" was already written to"); if (noDeleteOpenFile && openFiles.containsKey(name)) - throw new IOException("MockRAMDirectory: file \"" + name + "\" is still open: cannot overwrite"); + throw new IOException("MockRAMDirectory: file \"" + name + "\" is still open: cannot overwrite"); } RAMFile file = new RAMFile(this); synchronized (this) { @@ -234,7 +234,7 @@ } } - return new MockRAMOutputStream(this, file); + return new MockRAMOutputStream(this, file, name); } public IndexInput openInput(String name) throws IOException { Index: src/test/org/apache/lucene/index/TestIndexReader.java =================================================================== --- src/test/org/apache/lucene/index/TestIndexReader.java (revision 759556) +++ src/test/org/apache/lucene/index/TestIndexReader.java (working copy) @@ -44,6 +44,7 @@ import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.Query; +import org.apache.lucene.search.FieldCache; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; @@ -1130,7 +1131,7 @@ // IllegalStateException because above out-of-bounds // deleteDocument corrupted the index: writer.optimize(); - + writer.close(); if (!gotException) { fail("delete of out-of-bounds doc number failed to hit exception"); } @@ -1685,4 +1686,112 @@ s.close(); r.close(); } + + // LUCENE-1579: Ensure that on a cloned reader, segments + // reuse the doc values arrays in FieldCache + public void testFieldCacheReuseAfterClone() throws Exception { + Directory dir = new MockRAMDirectory(); + IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.UNLIMITED); + Document doc = new Document(); + doc.add(new Field("number", "17", Field.Store.NO, Field.Index.NOT_ANALYZED)); + writer.addDocument(doc); + writer.close(); + + // Open reader + IndexReader r = IndexReader.open(dir); + assertTrue(r instanceof SegmentReader); + final int[] ints = FieldCache.DEFAULT.getInts(r, "number"); + assertEquals(1, ints.length); + assertEquals(17, ints[0]); + + // Clone reader + IndexReader r2 = (IndexReader) r.clone(); + r.close(); + assertTrue(r2 != r); + assertTrue(r2 instanceof SegmentReader); + final int[] ints2 = FieldCache.DEFAULT.getInts(r2, "number"); + r2.close(); + + assertEquals(1, ints2.length); + assertEquals(17, ints2[0]); + assertTrue(ints == ints2); + + dir.close(); + } + + // LUCENE-1579: Ensure that on a reopened reader, that any + // shared segments reuse the doc values arrays in + // FieldCache + public void testFieldCacheReuseAfterReopen() throws Exception { + Directory dir = new MockRAMDirectory(); + IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.UNLIMITED); + Document doc = new Document(); + doc.add(new Field("number", "17", Field.Store.NO, Field.Index.NOT_ANALYZED)); + writer.addDocument(doc); + writer.commit(); + + // Open reader1 + IndexReader r = IndexReader.open(dir); + assertTrue(r instanceof SegmentReader); + final int[] ints = FieldCache.DEFAULT.getInts(r, "number"); + assertEquals(1, ints.length); + assertEquals(17, ints[0]); + + // Add new segment + writer.addDocument(doc); + writer.commit(); + + // Reopen reader1 --> reader2 + IndexReader r2 = (IndexReader) r.reopen(); + r.close(); + assertTrue(r2 instanceof MultiSegmentReader); + IndexReader sub0 = r2.getSequentialSubReaders()[0]; + assertTrue(sub0 instanceof SegmentReader); + final int[] ints2 = FieldCache.DEFAULT.getInts(sub0, "number"); + r2.close(); + assertTrue(ints == ints2); + + dir.close(); + } + + // LUCENE-1579: Make sure all SegmentReaders are new when + // reopen switches readOnly + public void testReopenChangeReadonly() throws Exception { + Directory dir = new MockRAMDirectory(); + IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.UNLIMITED); + Document doc = new Document(); + doc.add(new Field("number", "17", Field.Store.NO, Field.Index.NOT_ANALYZED)); + writer.addDocument(doc); + writer.commit(); + + // Open reader1 + IndexReader r = IndexReader.open(dir); + assertTrue(r instanceof SegmentReader); + final int[] ints = FieldCache.DEFAULT.getInts(r, "number"); + assertEquals(1, ints.length); + assertEquals(17, ints[0]); + + // Reopen to readonly w/ no chnages + IndexReader r3 = (IndexReader) r.reopen(true); + assertTrue(r3 instanceof ReadOnlySegmentReader); + r3.close(); + + // Add new segment + writer.addDocument(doc); + writer.commit(); + + // Reopen reader1 --> reader2 + IndexReader r2 = (IndexReader) r.reopen(true); + r.close(); + assertTrue(r2 instanceof MultiSegmentReader); + IndexReader[] subs = r2.getSequentialSubReaders(); + final int[] ints2 = FieldCache.DEFAULT.getInts(subs[0], "number"); + r2.close(); + + assertTrue(subs[0] instanceof ReadOnlySegmentReader); + assertTrue(subs[1] instanceof ReadOnlySegmentReader); + assertTrue(ints == ints2); + + dir.close(); + } } Index: src/test/org/apache/lucene/index/TestStressIndexing2.java =================================================================== --- src/test/org/apache/lucene/index/TestStressIndexing2.java (revision 759556) +++ src/test/org/apache/lucene/index/TestStressIndexing2.java (working copy) @@ -51,7 +51,21 @@ return true; } } - + + public void testRandomIWReader() throws Throwable { + r = newRandom(); + Directory dir = new MockRAMDirectory(); + + // TODO: verify equals using IW.getReader + DocsAndWriter dw = indexRandomIWReader(10, 100, 100, dir); + IndexReader r = dw.writer.getReader(); + dw.writer.commit(); + verifyEquals(r, dir, "id"); + r.close(); + dw.writer.close(); + dir.close(); + } + public void testRandom() throws Throwable { r = newRandom(); Directory dir1 = new MockRAMDirectory(); @@ -101,20 +115,69 @@ // This test avoids using any extra synchronization in the multiple // indexing threads to test that IndexWriter does correctly synchronize // everything. + + public static class DocsAndWriter { + Map docs; + IndexWriter writer; + } + + public DocsAndWriter indexRandomIWReader(int nThreads, int iterations, int range, Directory dir) throws IOException, InterruptedException { + Map docs = new HashMap(); + IndexWriter w = new MockIndexWriter(dir, autoCommit, new WhitespaceAnalyzer(), true); + w.setUseCompoundFile(false); + /*** + w.setMaxMergeDocs(Integer.MAX_VALUE); + w.setMaxFieldLength(10000); + w.setRAMBufferSizeMB(1); + w.setMergeFactor(10); + ***/ + + // force many merges + w.setMergeFactor(mergeFactor); + w.setRAMBufferSizeMB(.1); + w.setMaxBufferedDocs(maxBufferedDocs); + + threads = new IndexingThread[nThreads]; + for (int i=0; i 0); + final int count = warmer.warmCount; + + writer.addDocument(createDocument(17, "test", 4)); + writer.optimize(); + assertTrue(warmer.warmCount > count); + + writer.close(); + r1.close(); + dir1.close(); + } +} Property changes on: src/test/org/apache/lucene/index/TestIndexWriterReader.java ___________________________________________________________________ Added: svn:eol-style + native Index: src/test/org/apache/lucene/index/TestIndexReaderReopen.java =================================================================== --- src/test/org/apache/lucene/index/TestIndexReaderReopen.java (revision 759556) +++ src/test/org/apache/lucene/index/TestIndexReaderReopen.java (working copy) @@ -949,7 +949,7 @@ r.close(); } - private static Document createDocument(int n, int numFields) { + public static Document createDocument(int n, int numFields) { StringBuffer sb = new StringBuffer(); Document doc = new Document(); sb.append("a"); Index: src/java/org/apache/lucene/search/FieldCacheImpl.java =================================================================== --- src/java/org/apache/lucene/search/FieldCacheImpl.java (revision 759556) +++ src/java/org/apache/lucene/search/FieldCacheImpl.java (working copy) @@ -50,11 +50,13 @@ public Object get(IndexReader reader, Object key) throws IOException { Map innerCache; Object value; + final Object wrapper = reader.getFieldCacheWrapper(); + synchronized (readerCache) { - innerCache = (Map) readerCache.get(reader); + innerCache = (Map) readerCache.get(wrapper); if (innerCache == null) { innerCache = new HashMap(); - readerCache.put(reader, innerCache); + readerCache.put(wrapper, innerCache); value = null; } else { value = innerCache.get(key); Index: src/java/org/apache/lucene/index/DirectoryIndexReader.java =================================================================== --- src/java/org/apache/lucene/index/DirectoryIndexReader.java (revision 759556) +++ src/java/org/apache/lucene/index/DirectoryIndexReader.java (working copy) @@ -51,6 +51,7 @@ * rollback is necessary */ private boolean rollbackHasChanges; private SegmentInfos rollbackSegmentInfos; + IndexWriter writer; protected boolean readOnly; @@ -183,10 +184,12 @@ newReader.init(directory, clonedInfos, closeDirectory, openReadOnly); newReader.deletionPolicy = deletionPolicy; } - + newReader.writer = writer; // If we're cloning a non-readOnly reader, move the // writeLock (if there is one) to the new reader: if (!openReadOnly && writeLock != null) { + // In near real-time search, reader is always readonly + assert writer == null; newReader.writeLock = writeLock; writeLock = null; hasChanges = false; @@ -203,6 +206,29 @@ assert commit == null || openReadOnly; + // If we were obtained by writer.getReader(), re-ask the + // writer to get a new reader. + if (writer != null) { + assert readOnly; + + if (!openReadOnly) { + throw new IllegalArgumentException("a reader obtained from IndexWriter.getReader() can only be reopened with openReadOnly=true (got false)"); + } + + if (commit != null) { + throw new IllegalArgumentException("a reader obtained from IndexWriter.getReader() cannot currently accept a commit"); + } + + if (!writer.isOpen(true)) { + throw new IllegalStateException("cannot reopen: the IndexWriter this reader was obtained from is now closed"); + } + + // TODO: right now we *always* make a new reader; in + // the future we could have write make some effort to + // detect that no changes have occurred + return writer.getReader(); + } + if (commit == null) { if (hasChanges) { // We have changes, which means we are not readOnly: Index: src/java/org/apache/lucene/index/SegmentInfo.java =================================================================== --- src/java/org/apache/lucene/index/SegmentInfo.java (revision 759556) +++ src/java/org/apache/lucene/index/SegmentInfo.java (working copy) @@ -79,6 +79,10 @@ private boolean hasProx; // True if this segment has any fields with omitTermFreqAndPositions==false + public String toString() { + return "si: "+dir.toString()+" "+name+" docCount: "+docCount+" delCount: "+delCount+" delFileName: "+getDelFileName(); + } + public SegmentInfo(String name, int docCount, Directory dir) { this.name = name; this.docCount = docCount; @@ -490,6 +494,12 @@ docStoreOffset = offset; clearFiles(); } + + void setDocStore(int offset, String segment, boolean isCompoundFile) { + docStoreOffset = offset; + docStoreSegment = segment; + docStoreIsCompoundFile = isCompoundFile; + } /** * Save this segment's info. Index: src/java/org/apache/lucene/index/SegmentReader.java =================================================================== --- src/java/org/apache/lucene/index/SegmentReader.java (revision 759556) +++ src/java/org/apache/lucene/index/SegmentReader.java (working copy) @@ -45,6 +45,7 @@ private String segment; private SegmentInfo si; private int readBufferSize; + private FieldCacheWrapper fieldCacheWrapper; FieldInfos fieldInfos; private FieldsReader fieldsReaderOrig = null; @@ -469,6 +470,55 @@ return instance; } + synchronized void openDocStores() throws IOException { + if (fieldsReaderOrig == null) { + final Directory storeDir; + if (si.getDocStoreOffset() != -1) { + if (si.getDocStoreIsCompoundFile()) { + storeCFSReader = new CompoundFileReader(directory(), + si.getDocStoreSegment() + "." + IndexFileNames.COMPOUND_FILE_STORE_EXTENSION, + readBufferSize); + storeDir = storeCFSReader; + assert storeDir != null; + } else { + storeDir = directory(); + assert storeDir != null; + } + } else if (si.getUseCompoundFile()) { + // In some cases, we were originally opened when CFS + // was not used, but then we are asked to open doc + // stores after the segment has switched to CFS + if (cfsReader == null) { + cfsReader = new CompoundFileReader(directory(), segment + "." + IndexFileNames.COMPOUND_FILE_EXTENSION, readBufferSize); + } + storeDir = cfsReader; + assert storeDir != null; + } else { + storeDir = directory(); + assert storeDir != null; + } + + final String storesSegment; + if (si.getDocStoreOffset() != -1) { + storesSegment = si.getDocStoreSegment(); + } else { + storesSegment = segment; + } + + fieldsReaderOrig = new FieldsReader(storeDir, storesSegment, fieldInfos, readBufferSize, + si.getDocStoreOffset(), si.docCount); + + // Verify two sources of "maxDoc" agree: + if (si.getDocStoreOffset() == -1 && fieldsReaderOrig.size() != si.docCount) { + throw new CorruptIndexException("doc counts differ for segment " + si.name + ": fieldsReader shows " + fieldsReaderOrig.size() + " but segmentInfo shows " + si.docCount); + } + + if (fieldInfos.hasVectors()) { // open term vector files only as needed + termVectorsReaderOrig = new TermVectorsReader(storeDir, storesSegment, fieldInfos, readBufferSize, si.getDocStoreOffset(), si.docCount); + } + } + } + private void initialize(SegmentInfo si, int readBufferSize, boolean doOpenStores) throws CorruptIndexException, IOException { segment = si.name; this.si = si; @@ -484,47 +534,18 @@ cfsDir = cfsReader; } - final Directory storeDir; + fieldInfos = new FieldInfos(cfsDir, segment + ".fnm"); if (doOpenStores) { - if (si.getDocStoreOffset() != -1) { - if (si.getDocStoreIsCompoundFile()) { - storeCFSReader = new CompoundFileReader(directory(), si.getDocStoreSegment() + "." + IndexFileNames.COMPOUND_FILE_STORE_EXTENSION, readBufferSize); - storeDir = storeCFSReader; - } else { - storeDir = directory(); - } - } else { - storeDir = cfsDir; - } - } else - storeDir = null; + openDocStores(); + } - fieldInfos = new FieldInfos(cfsDir, segment + ".fnm"); - boolean anyProx = false; final int numFields = fieldInfos.size(); for(int i=0;!anyProx && iNOTE: If this reader is a near real-time + * reader (obtained from {@link IndexWriter#getReader()}, + * reopen() will simply call writer.getReader() again for + * you, though this may change in the future. * * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error @@ -1248,4 +1253,10 @@ public IndexReader[] getSequentialSubReaders() { return null; } + + /** Expert + * @deprecated */ + public Object getFieldCacheWrapper() { + return this; + } } Index: src/java/org/apache/lucene/index/DocumentsWriter.java =================================================================== --- src/java/org/apache/lucene/index/DocumentsWriter.java (revision 759556) +++ src/java/org/apache/lucene/index/DocumentsWriter.java (working copy) @@ -915,25 +915,17 @@ int docStart = 0; boolean any = false; for (int i = 0; i < infosEnd; i++) { - IndexReader reader = SegmentReader.get(infos.info(i), false); - boolean success = false; + + // Make sure we never attempt to apply deletes to + // segment in external dir + assert infos.info(i).dir == directory; + + SegmentReader reader = writer.readerPool.get(infos.info(i), false); try { any |= applyDeletes(reader, docStart); docStart += reader.maxDoc(); - success = true; } finally { - if (reader != null) { - try { - if (success) - reader.commit(); - } finally { - // Force reader to not have changes; if we hit - // an exception during commit, we don't want - // close to retry the commit: - reader.hasChanges = false; - reader.close(); - } - } + writer.readerPool.release(reader); } } Index: src/java/org/apache/lucene/index/MultiSegmentReader.java =================================================================== --- src/java/org/apache/lucene/index/MultiSegmentReader.java (revision 759556) +++ src/java/org/apache/lucene/index/MultiSegmentReader.java (working copy) @@ -51,24 +51,60 @@ SegmentReader[] readers = new SegmentReader[sis.size()]; for (int i = sis.size()-1; i >= 0; i--) { + boolean success = false; try { readers[i] = SegmentReader.get(readOnly, sis.info(i)); - } catch (IOException e) { - // Close all readers we had opened: - for(i++;i=0;i--) { + try { + readers[i].close(); + } catch (Exception ignore) { + // keep going - we want to clean up as much as possible + } + } + } + } + } + + this.writer = writer; + + initialize(readers); + } + /** This contructor is only used for {@link #reopen()} */ MultiSegmentReader(Directory directory, SegmentInfos infos, boolean closeDirectory, SegmentReader[] oldReaders, int[] oldStarts, Map oldNormsCache, boolean readOnly, boolean doClone) throws IOException { Index: src/java/org/apache/lucene/index/IndexWriter.java =================================================================== --- src/java/org/apache/lucene/index/IndexWriter.java (revision 759556) +++ src/java/org/apache/lucene/index/IndexWriter.java (working copy) @@ -27,7 +27,7 @@ import org.apache.lucene.store.Lock; import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.store.AlreadyClosedException; -import org.apache.lucene.util.BitVector; +import org.apache.lucene.store.BufferedIndexInput; import org.apache.lucene.util.Constants; import java.io.File; @@ -41,6 +41,7 @@ import java.util.HashSet; import java.util.LinkedList; import java.util.Iterator; +import java.util.Map; /** An IndexWriter creates and maintains an index. @@ -367,8 +368,271 @@ // TODO: use ReadWriteLock once we are on 5.0 private int readCount; // count of how many threads are holding read lock private Thread writeThread; // non-null if any thread holds write lock + final ReaderPool readerPool = new ReaderPool(); private int upgradeCount; + + // This is a "write once" variable (like the organic dye + // on a DVD-R that may or may not be heated by a laser and + // then cooled to permanently record the event): it's + // false, until getReader() is called for the first time, + // at which point it's switched to true and never changes + // back to false. Once this is true, we hold open and + // reuse SegmentReader instances internally for applying + // deletes, doing merges, and reopening near real-time + // readers. + private volatile boolean poolReaders; + + /** + * Expert: returns a readonly reader containing all + * current updates. Flush is called automatically. This + * provides "near real-time" searching, in that changes + * made during an IndexWriter session can be made + * available for searching without closing the writer. + * + *

It's near real-time because there is no hard + * guarantee on how quickly you can get a new reader after + * making changes with IndexWriter. You'll have to + * experiment in your situation to determine if it's + * faster enough. As this is a new and experimental + * feature, please report back on your findings so we can + * learn, improve and iterate.

+ * + *

The resulting reader suppports {@link + * IndexReader#reopen}, but that call will simply forward + * back to this method (though this may change in the + * future).

+ * + *

The very first time this method is called, this + * writer instance will make every effort to pool the + * readers that it opens for doing merges, applying + * deletes, etc. This means additional resources (RAM, + * file descriptors, CPU time) will be incurred.

+ * + *

NOTE: This is an experimental API and is + * subject to suddenly change in non-back-compatible + * ways.

+ * + *

For lower latency on reopening a reader, you may + * want to call {@link #setMergedSegmentWarmer} to + * pre-warm a newly merged segment before it's committed + * to the index.

+ * + * @return IndexReader that covers entire index plus all + * changes made so far by this IndexWriter instance + * + * @throws IOException + */ + public IndexReader getReader() throws IOException { + if (infoStream != null) { + message("flush at getReader"); + } + // Do this up front before flushing so that the readers + // obtained during this flush are pooled, the first time + // this method is called: + poolReaders = true; + + flush(true, true, true); + + // Prevent segmentInfos from changing while opening the + // reader; in theory we could do similar retry logic, + // just like we do when loading segments_N + synchronized(this) { + return new ReadOnlyMultiSegmentReader(this, segmentInfos); + } + } + + /** Holds shared SegmentReader instances. IndexWriter uses + * SegmentReaders for 1) applying deletes, 2) doing + * merges, 3) handing out a real-time reader. This pool + * reuses instances of the SegmentReaders in all these + * places if it is in "near real-time mode" (getReader() + * has been called on this instance). */ + + class ReaderPool { + + private final Map readerMap = new HashMap(); + + /** Forcefully clear changes for the specifed segments, + * and remove from the pool. This is called on succesful merge. */ + synchronized void clear(SegmentInfos infos) throws IOException { + if (infos == null) { + Iterator iter = readerMap.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry ent = (Map.Entry) iter.next(); + ((SegmentReader) ent.getValue()).hasChanges = false; + } + } else { + final int numSegments = infos.size(); + for(int i=0;i 0) @@ -415,6 +679,10 @@ notifyAll(); } + synchronized final boolean isOpen(boolean includePendingClose) { + return !(closed || (includePendingClose && closing)); + } + /** * Used internally to throw an {@link * AlreadyClosedException} if this IndexWriter has been @@ -422,7 +690,7 @@ * @throws AlreadyClosedException if this IndexWriter is */ protected synchronized final void ensureOpen(boolean includePendingClose) throws AlreadyClosedException { - if (closed || (includePendingClose && closing)) { + if (!isOpen(includePendingClose)) { throw new AlreadyClosedException("this IndexWriter is closed"); } } @@ -1795,6 +2063,7 @@ message("at close: " + segString()); synchronized(this) { + readerPool.close(); docWriter = null; deleter.close(); } @@ -1851,6 +2120,10 @@ if (useCompoundDocStore && docStoreSegment != null && docWriter.closedFiles().size() != 0) { // Now build compound doc store file + if (infoStream != null) { + message("create compound file " + docStoreSegment + "." + IndexFileNames.COMPOUND_FILE_STORE_EXTENSION); + } + success = false; final int numSegments = segmentInfos.size(); @@ -2779,7 +3052,9 @@ // First restore autoCommit in case we hit an exception below: autoCommit = localAutoCommit; - docWriter.setFlushedDocCount(localFlushedDocCount); + if (docWriter != null) { + docWriter.setFlushedDocCount(localFlushedDocCount); + } // Must finish merges before rolling back segmentInfos // so merges don't hit exceptions on trying to commit @@ -2935,6 +3210,9 @@ deleter.refresh(); } + // Don't bother saving any changes in our segmentInfos + readerPool.clear(null); + lastCommitChangeCount = changeCount; success = true; @@ -3121,7 +3399,9 @@ hitOOM = true; throw oom; } finally { - docWriter.resumeAllThreads(); + if (docWriter != null) { + docWriter.resumeAllThreads(); + } } } @@ -3262,7 +3542,9 @@ hitOOM = true; throw oom; } finally { - docWriter.resumeAllThreads(); + if (docWriter != null) { + docWriter.resumeAllThreads(); + } } } @@ -3410,10 +3692,10 @@ mergedName = newSegmentName(); merger = new SegmentMerger(this, mergedName, null); - IndexReader sReader = null; + SegmentReader sReader = null; synchronized(this) { if (segmentInfos.size() == 1) { // add existing index, if any - sReader = SegmentReader.get(true, segmentInfos.info(0)); + sReader = readerPool.get(segmentInfos.info(0), true); } } @@ -3428,11 +3710,6 @@ int docCount = merger.merge(); // merge 'em - if(sReader != null) { - sReader.close(); - sReader = null; - } - synchronized(this) { segmentInfos.clear(); // pop old infos & add new info = new SegmentInfo(mergedName, docCount, directory, false, true, @@ -3447,7 +3724,7 @@ } finally { if (sReader != null) { - sReader.close(); + readerPool.release(sReader); } } } finally { @@ -3508,7 +3785,9 @@ hitOOM = true; throw oom; } finally { - docWriter.resumeAllThreads(); + if (docWriter != null) { + docWriter.resumeAllThreads(); + } } } @@ -3757,6 +4036,9 @@ // stores when we flush flushDocStores |= autoCommit; String docStoreSegment = docWriter.getDocStoreSegment(); + + assert docStoreSegment != null || numDocs == 0; + if (docStoreSegment == null) flushDocStores = false; @@ -3898,7 +4180,7 @@ int first = segmentInfos.indexOf(merge.segments.info(0)); if (first == -1) - throw new MergePolicy.MergeException("could not find segment " + merge.segments.info(0).name + " in current segments", directory); + throw new MergePolicy.MergeException("could not find segment " + merge.segments.info(0).name + " in current index " + segString(), directory); final int numSegments = segmentInfos.size(); @@ -3908,7 +4190,7 @@ if (first + i >= numSegments || !segmentInfos.info(first+i).equals(info)) { if (segmentInfos.indexOf(info) == -1) - throw new MergePolicy.MergeException("MergePolicy selected a segment (" + info.name + ") that is not in the index", directory); + throw new MergePolicy.MergeException("MergePolicy selected a segment (" + info.name + ") that is not in the current index " + segString(), directory); else throw new MergePolicy.MergeException("MergePolicy selected non-contiguous segments to merge (" + merge.segString(directory) + " vs " + segString() + "), which IndexWriter (currently) cannot handle", directory); @@ -3927,11 +4209,10 @@ * saves the resulting deletes file (incrementing the * delete generation for merge.info). If no deletes were * flushed, no new deletes file is saved. */ - synchronized private void commitMergedDeletes(MergePolicy.OneMerge merge) throws IOException { + synchronized private void commitMergedDeletes(MergePolicy.OneMerge merge, SegmentReader mergeReader) throws IOException { assert testPoint("startCommitMergeDeletes"); - final SegmentInfos sourceSegmentsClone = merge.segmentsClone; final SegmentInfos sourceSegments = merge.segments; if (infoStream != null) @@ -3939,22 +4220,16 @@ // Carefully merge deletes that occurred after we // started merging: - - BitVector deletes = null; int docUpto = 0; int delCount = 0; - final int numSegmentsToMerge = sourceSegments.size(); - for(int i=0;i previousReader.numDeletedDocs()) { // This means this segment has had new deletes // committed since we started the merge, so we // must merge them: - if (deletes == null) - deletes = new BitVector(merge.info.docCount); - - BitVector currentDeletes = new BitVector(currentInfo.dir, currentInfo.getDelFileName()); for(int j=0;j= 0; } /* FIXME if we want to support non-contiguous segment merges */ - synchronized private boolean commitMerge(MergePolicy.OneMerge merge, SegmentMerger merger, int mergedDocCount) throws IOException { + synchronized private boolean commitMerge(MergePolicy.OneMerge merge, SegmentMerger merger, int mergedDocCount, SegmentReader mergedReader) throws IOException { assert testPoint("startCommitMerge"); @@ -4048,8 +4304,7 @@ final int start = ensureContiguousMerge(merge); - commitMergedDeletes(merge); - + commitMergedDeletes(merge, mergedReader); docWriter.remapDeletes(segmentInfos, merger.getDocMaps(), merger.getDelCounts(), merge, mergedDocCount); // Simple optimization: if the doc store we are using @@ -4077,22 +4332,19 @@ assert !segmentInfos.contains(merge.info); segmentInfos.add(start, merge.info); - // Must checkpoint before decrefing so any newly - // referenced files in the new merge.info are incref'd - // first: - checkpoint(); + // If the merged segments had pending changes, clear + // them so that they don't bother writing them to + // disk, updating SegmentInfo, etc.: + readerPool.clear(merge.segments); - decrefMergeSegments(merge); - if (merge.optimize) segmentsToOptimize.add(merge.info); return true; } - - private void decrefMergeSegments(MergePolicy.OneMerge merge) throws IOException { + + private synchronized void decrefMergeSegments(MergePolicy.OneMerge merge) throws IOException { assert merge.increfDone; merge.increfDone = false; - deleter.decRef(merge.segmentsClone); } final private void handleMergeException(Throwable t, MergePolicy.OneMerge merge) throws IOException { @@ -4344,15 +4596,8 @@ if (infoStream != null) message("now flush at merge"); doFlush(true, false); - //flush(false, true, false); } - // We must take a full copy at this point so that we can - // properly merge deletes in commitMerge() - merge.segmentsClone = (SegmentInfos) merge.segments.clone(); - - deleter.incRef(merge.segmentsClone, false); - merge.increfDone = true; merge.mergeDocStores = mergeDocStores; @@ -4452,47 +4697,148 @@ int mergedDocCount = 0; SegmentInfos sourceSegments = merge.segments; - SegmentInfos sourceSegmentsClone = merge.segmentsClone; final int numSegments = sourceSegments.size(); if (infoStream != null) message("merging " + merge.segString(directory)); merger = new SegmentMerger(this, mergedName, merge); + + merge.readers = new SegmentReader[numSegments]; + merge.readersClone = new SegmentReader[numSegments]; + + boolean mergeDocStores = false; + + final Set dss = new HashSet(); // This is try/finally to make sure merger's readers are // closed: + boolean success = false; try { int totDocCount = 0; for (int i = 0; i < numSegments; i++) { - SegmentInfo si = sourceSegmentsClone.info(i); - IndexReader reader = SegmentReader.get(true, si, MERGE_READ_BUFFER_SIZE, merge.mergeDocStores); // no need to set deleter (yet) - merger.add(reader); - totDocCount += reader.numDocs(); + + final SegmentInfo info = sourceSegments.info(i); + + // Hold onto the "live" reader; we will use this to + // commit merged deletes + SegmentReader reader = merge.readers[i] = readerPool.get(info, merge.mergeDocStores, + MERGE_READ_BUFFER_SIZE); + + // We clone the segment readers because other + // deletes may come in while we're merging so we + // need readers that will not change + SegmentReader clone = merge.readersClone[i] = (SegmentReader) reader.clone(true); + merger.add(clone); + + if (clone.hasDeletions()) { + mergeDocStores = true; + } + + if (info.getDocStoreOffset() != -1) { + dss.add(info.getDocStoreSegment()); + } + + totDocCount += clone.numDocs(); } + if (infoStream != null) { message("merge: total "+totDocCount+" docs"); } merge.checkAborted(directory); + // If deletions have arrived and it has now become + // necessary to merge doc stores, go and open them: + if (mergeDocStores && !merge.mergeDocStores) { + merge.mergeDocStores = true; + synchronized(this) { + if (dss.contains(docWriter.getDocStoreSegment())) { + if (infoStream != null) + message("now flush at mergeMiddle"); + doFlush(true, false); + } + } + + // nocommit -- how bad is it that we do this open on + // the clones and not orig? + for(int i=0;iNOTE: This is an experimental API and is + * subject to suddenly change in non-back-compatible + * ways. + * + *

NOTE: warm is called before any deletes have + * been carried over to the merged segment. */ + public static abstract class IndexReaderWarmer { + public abstract void warm(IndexReader reader) throws IOException; + } + + private IndexReaderWarmer mergedSegmentWarmer; + + /** Set the merged segment warmer. See {@link + * IndexReaderWarmer}. */ + public void setMergedSegmentWarmer(IndexReaderWarmer warmer) { + mergedSegmentWarmer = warmer; + } + // Used only by assert for testing. Current points: // startDoFlush // startCommitMerge Index: src/java/org/apache/lucene/index/IndexFileDeleter.java =================================================================== --- src/java/org/apache/lucene/index/IndexFileDeleter.java (revision 759556) +++ src/java/org/apache/lucene/index/IndexFileDeleter.java (working copy) @@ -485,7 +485,7 @@ private RefCount getRefCount(String fileName) { RefCount rc; if (!refCounts.containsKey(fileName)) { - rc = new RefCount(); + rc = new RefCount(fileName); refCounts.put(fileName, rc); } else { rc = (RefCount) refCounts.get(fileName); @@ -543,14 +543,26 @@ */ final private static class RefCount { + // fileName used only for better assert error messages + final String fileName; + boolean initDone; + RefCount(String fileName) { + this.fileName = fileName; + } + int count; public int IncRef() { + if (!initDone) { + initDone = true; + } else { + assert count > 0: "RefCount is 0 pre-increment for file \"" + fileName + "\""; + } return ++count; } public int DecRef() { - assert count > 0; + assert count > 0: "RefCount is 0 pre-decrement for file \"" + fileName + "\""; return --count; } } Index: src/java/org/apache/lucene/index/ReadOnlyMultiSegmentReader.java =================================================================== --- src/java/org/apache/lucene/index/ReadOnlyMultiSegmentReader.java (revision 759556) +++ src/java/org/apache/lucene/index/ReadOnlyMultiSegmentReader.java (working copy) @@ -30,7 +30,11 @@ ReadOnlyMultiSegmentReader(Directory directory, SegmentInfos infos, boolean closeDirectory, SegmentReader[] oldReaders, int[] oldStarts, Map oldNormsCache, boolean doClone) throws IOException { super(directory, infos, closeDirectory, oldReaders, oldStarts, oldNormsCache, true, doClone); } - + + ReadOnlyMultiSegmentReader(IndexWriter writer, SegmentInfos infos) throws IOException { + super(writer, infos); + } + protected void acquireWriteLock() { ReadOnlySegmentReader.noWrite(); } Index: contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/NearRealtimeReaderTask.java =================================================================== --- contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/NearRealtimeReaderTask.java (revision 0) +++ contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/NearRealtimeReaderTask.java (revision 0) @@ -0,0 +1,132 @@ +package org.apache.lucene.benchmark.byTask.tasks; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; + +import org.apache.lucene.benchmark.byTask.PerfRunData; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.store.Directory; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.TopFieldDocs; +import org.apache.lucene.index.Term; + +/** + * Open an index reader. + *
Other side effects: index redaer object in perfRunData is set. + */ +public class NearRealtimeReaderTask extends PerfTask { + + ReopenThread t; + float pauseSec = 3.0f; + + private static class ReopenThread extends Thread { + + final IndexWriter writer; + final int pauseMsec; + + public volatile boolean done; + + ReopenThread(IndexWriter writer, float pauseSec) { + this.writer = writer; + this.pauseMsec = (int) (1000*pauseSec); + setDaemon(true); + } + + public void run() { + + IndexReader reader = null; + + final Query query = new TermQuery(new Term("body", "1")); + final SortField sf = new SortField("docdate", SortField.LONG); + final Sort sort = new Sort(sf); + int count = 0; + + try { + while(!done) { + final long t0 = System.currentTimeMillis(); + if (reader == null) { + reader = writer.getReader(); + } else { + final IndexReader newReader = reader.reopen(); + if (reader != newReader) { + reader.close(); + reader = newReader; + } + } + final long t1 = System.currentTimeMillis(); + + final TopFieldDocs hits = new IndexSearcher(reader).search(query, null, 10, sort); + + final long t2 = System.currentTimeMillis(); + + //final TopFieldDocs hits2 = new IndexSearcher(reader).search(query, null, 10, sort); + + //final long t3 = System.currentTimeMillis(); + + //System.out.println("rr: open " + (t1-t0) + " msec; search " + (t2-t1) + " msec, " + hits.totalHits + + //" results; search2 " + (t3-t2) + " msec; " + reader.numDocs() + " docs"); + + System.out.println("rr: open " + (t1-t0) + " msec; search " + (t2-t1) + " msec, " + hits.totalHits + + " results; " + reader.numDocs() + " docs"); + + final long t4 = System.currentTimeMillis(); + final int delay = (int) (pauseMsec - (t4-t0)); + if (delay > 0) { + try { + Thread.sleep(delay); + } catch (InterruptedException ie) { + throw new RuntimeException(ie); + } + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + public NearRealtimeReaderTask(PerfRunData runData) { + super(runData); + } + + public int doLogic() throws IOException { + if (t == null) { + IndexWriter w = getRunData().getIndexWriter(); + t = new ReopenThread(w, pauseSec); + t.start(); + } + return 1; + } + + // nocommit -- need to close thread on exit + + public void setParams(String params) { + super.setParams(params); + pauseSec = Float.parseFloat(params); + } + + public boolean supportsParams() { + return true; + } +} Property changes on: contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/NearRealtimeReaderTask.java ___________________________________________________________________ Added: svn:eol-style + native Index: contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/UpdateDocTask.java =================================================================== --- contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/UpdateDocTask.java (revision 0) +++ contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/UpdateDocTask.java (revision 0) @@ -0,0 +1,109 @@ +package org.apache.lucene.benchmark.byTask.tasks; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.lucene.benchmark.byTask.PerfRunData; +import org.apache.lucene.benchmark.byTask.feeds.DocMaker; +import org.apache.lucene.benchmark.byTask.feeds.BasicDocMaker; +import org.apache.lucene.document.Document; +import org.apache.lucene.index.Term; +import java.text.NumberFormat; + + +/** + * Update a document, optionally with of a certain size. + *
Other side effects: none. + *
Relevant properties: doc.add.log.step. + *
Takes optional param: document size. + */ +public class UpdateDocTask extends PerfTask { + + public UpdateDocTask(PerfRunData runData) { + super(runData); + } + + private int logStep = -1; + private int docSize = 0; + int count = 0; + + // volatile data passed between setup(), doLogic(), tearDown(). + private Document doc = null; + + /* + * (non-Javadoc) + * @see PerfTask#setup() + */ + public void setup() throws Exception { + super.setup(); + DocMaker docMaker = getRunData().getDocMaker(); + if (docSize > 0) { + doc = docMaker.makeDocument(docSize); + } else { + doc = docMaker.makeDocument(); + } + } + + /* (non-Javadoc) + * @see PerfTask#tearDown() + */ + public void tearDown() throws Exception { + log(++count); + doc = null; + super.tearDown(); + } + + public int doLogic() throws Exception { + final String docID = doc.get(BasicDocMaker.ID_FIELD); + if (docID == null) { + throw new IllegalStateException("document must define the docid field"); + } + getRunData().getIndexWriter().updateDocument(new Term(BasicDocMaker.ID_FIELD, docID), + doc); + return 1; + } + + private void log (int count) { + if (logStep<0) { + // init once per instance + logStep = getRunData().getConfig().get("doc.add.log.step",AddDocTask.DEFAULT_ADD_DOC_LOG_STEP); + } + if (logStep>0 && (count%logStep)==0) { + double seconds = (System.currentTimeMillis() - getRunData().getStartTimeMillis())/1000.0; + NumberFormat nf = NumberFormat.getInstance(); + nf.setMaximumFractionDigits(2); + System.out.println("--> "+nf.format(seconds) + " sec: " + Thread.currentThread().getName()+" processed (update) "+count+" docs"); + } + } + + /** + * Set the params (docSize only) + * @param params docSize, or 0 for no limit. + */ + public void setParams(String params) { + super.setParams(params); + docSize = (int) Float.parseFloat(params); + } + + /* (non-Javadoc) + * @see org.apache.lucene.benchmark.byTask.tasks.PerfTask#supportsParams() + */ + public boolean supportsParams() { + return true; + } + +} Property changes on: contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/UpdateDocTask.java ___________________________________________________________________ Added: svn:eol-style + native Index: contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/LineDocMaker.java =================================================================== --- contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/LineDocMaker.java (revision 759556) +++ contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/LineDocMaker.java (working copy) @@ -22,12 +22,15 @@ import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; +import org.apache.lucene.index.IndexWriter; import java.io.BufferedReader; import java.io.IOException; import java.io.FileInputStream; import java.io.InputStreamReader; +import java.util.Random; + /** * A DocMaker reading one line at a time as a Document from * a single file. This saves IO cost (over DirDocMaker) of @@ -38,6 +41,9 @@ * Config properties: * docs.file=<path to the file%gt; * doc.reuse.fields=true|false (default true) + * doc.random.id.limit=N (default -1) -- create random + * docIDs in the range 0..N; this is useful + * with UpdateDoc to test updating random documents */ public class LineDocMaker extends BasicDocMaker { @@ -50,6 +56,8 @@ private final DocState localDocState = new DocState(); private boolean doReuseFields = true; + private Random r; + private int numDocs; class DocState { Document doc; @@ -86,6 +94,11 @@ final static String SEP = WriteLineDocTask.SEP; + private int numDocsCreated; + private synchronized int incrNumDocsCreated() { + return numDocsCreated++; + } + public Document setFields(String line) { // title date body final String title, date, body; @@ -102,12 +115,22 @@ } else title = date = body = ""; + final String docID; + if (r != null) { + docID = "doc" + r.nextInt(numDocs); + } else { + docID = "doc" + incrNumDocsCreated(); + } + if (doReuseFields) { + idField.setValue(docID); titleField.setValue(title); dateField.setValue(date); bodyField.setValue(body); return doc; } else { + Field localIDField = new Field(BasicDocMaker.ID_FIELD, docID, Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS); + Field localTitleField = new Field(BasicDocMaker.TITLE_FIELD, title, storeVal, @@ -183,6 +206,10 @@ public void setConfig(Config config) { super.setConfig(config); doReuseFields = config.get("doc.reuse.fields", true); + numDocs = config.get("doc.random.id.limit", -1); + if (numDocs != -1) { + r = new Random(179); + } } synchronized void openFile() {