Index: CHANGES.txt =================================================================== --- CHANGES.txt (revision 788197) +++ CHANGES.txt (working copy) @@ -491,7 +491,14 @@ parsers for fields generated using NumericField/NumericTokenStream. All standard parsers now also implement Serializable and enforce their singleton status. (Uwe Schindler, Mike McCandless) - + +31. LUCENE-1313: Near realtime search enhancement. NRT may be turned on + in the IndexWriter. When on, new segments are flushed to a RAMDirectory + first, merged in RAM until a limit is exceeded, then flushed to the + primary/regular directory. The feature reduces the turnaround time + for IndexWriter.getReader after making small updates. + (Jason Rutherglen via Mike McCandless) + Optimizations 1. LUCENE-1427: Fixed QueryWrapperFilter to not waste time computing Index: contrib/benchmark/build.xml =================================================================== --- contrib/benchmark/build.xml (revision 789908) +++ contrib/benchmark/build.xml (working copy) @@ -7,8 +7,8 @@ + - Index: src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java =================================================================== --- src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java (revision 789908) +++ src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java (working copy) @@ -207,7 +207,7 @@ } if (verbose()) - message(" consider merge " + merge.segString(dir)); + message(" consider merge " + merge.segString()); assert mergeThreadCount() < maxThreadCount; @@ -296,7 +296,7 @@ if (merge != null) { writer.mergeInit(merge); if (verbose()) - message(" merge thread: do another merge " + merge.segString(dir)); + message(" merge thread: do another merge " + merge.segString()); } else break; } @@ -328,7 +328,7 @@ MergePolicy.OneMerge merge = getRunningMerge(); if (merge == null) merge = startMerge; - return "merge thread: " + merge.segString(dir); + return "merge thread: " + merge.segString(); } } Index: src/java/org/apache/lucene/index/DirectoryReader.java =================================================================== --- src/java/org/apache/lucene/index/DirectoryReader.java (revision 789908) +++ src/java/org/apache/lucene/index/DirectoryReader.java (working copy) @@ -139,7 +139,7 @@ boolean success = false; try { final SegmentInfo info = infos.info(upto); - if (info.dir == dir) { + if (info.dir.contains(dir)) { readers[upto++] = writer.readerPool.getReadOnlyClone(info, true); } success = true; Index: src/java/org/apache/lucene/index/DocumentsWriter.java =================================================================== --- src/java/org/apache/lucene/index/DocumentsWriter.java (revision 789908) +++ src/java/org/apache/lucene/index/DocumentsWriter.java (working copy) @@ -37,6 +37,7 @@ import org.apache.lucene.search.Similarity; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.RAMDirectory; import org.apache.lucene.util.ArrayUtil; /** @@ -110,7 +111,6 @@ final class DocumentsWriter { IndexWriter writer; - Directory directory; String segment; // Current segment we are working on private String docStoreSegment; // Current doc-store segment we are writing @@ -233,17 +233,9 @@ // they must be flushed to disk. private int maxBufferedDeleteTerms = IndexWriter.DEFAULT_MAX_BUFFERED_DELETE_TERMS; - // How much RAM we can use before flushing. This is 0 if - // we are flushing by doc count instead. - private long ramBufferSize = (long) (IndexWriter.DEFAULT_RAM_BUFFER_SIZE_MB*1024*1024); - private long waitQueuePauseBytes = (long) (ramBufferSize*0.1); - private long waitQueueResumeBytes = (long) (ramBufferSize*0.05); + private long waitQueuePauseBytes; + private long waitQueueResumeBytes; - // If we've allocated 5% over our RAM budget, we then - // free down to 95% - private long freeTrigger = (long) (IndexWriter.DEFAULT_RAM_BUFFER_SIZE_MB*1024*1024*1.05); - private long freeLevel = (long) (IndexWriter.DEFAULT_RAM_BUFFER_SIZE_MB*1024*1024*0.95); - // Flush @ this number of docs. If ramBufferSize is // non-zero we will flush by RAM usage instead. private int maxBufferedDocs = IndexWriter.DEFAULT_MAX_BUFFERED_DOCS; @@ -262,8 +254,7 @@ private boolean closed; - DocumentsWriter(Directory directory, IndexWriter writer, IndexingChain indexingChain) throws IOException { - this.directory = directory; + DocumentsWriter(IndexWriter writer, IndexingChain indexingChain) throws IOException { this.writer = writer; this.similarity = writer.getSimilarity(); flushedDocCount = writer.maxDoc(); @@ -274,6 +265,16 @@ } } + void setMaxRAM(long value) { + if (value < 0) { + waitQueuePauseBytes = 4*1024*1024; + waitQueueResumeBytes = 2*1024*1024; + } else { + waitQueuePauseBytes = (long) (value*0.1); + waitQueueResumeBytes = (long) (value*0.05); + } + } + /** Returns true if any of the fields in the current * buffered docs have omitTermFreqAndPositions==false */ boolean hasProx() { @@ -306,29 +307,6 @@ threadStates[i].docState.allowMinus1Position = true; } - /** Set how much RAM we can use before flushing. */ - synchronized void setRAMBufferSizeMB(double mb) { - if (mb == IndexWriter.DISABLE_AUTO_FLUSH) { - ramBufferSize = IndexWriter.DISABLE_AUTO_FLUSH; - waitQueuePauseBytes = 4*1024*1024; - waitQueueResumeBytes = 2*1024*1024; - } else { - ramBufferSize = (long) (mb*1024*1024); - waitQueuePauseBytes = (long) (ramBufferSize*0.1); - waitQueueResumeBytes = (long) (ramBufferSize*0.05); - freeTrigger = (long) (1.05 * ramBufferSize); - freeLevel = (long) (0.95 * ramBufferSize); - } - } - - synchronized double getRAMBufferSizeMB() { - if (ramBufferSize == IndexWriter.DISABLE_AUTO_FLUSH) { - return ramBufferSize; - } else { - return ramBufferSize/1024./1024.; - } - } - /** Set max buffered docs, which means we will flush by * doc count instead of by RAM usage. */ void setMaxBufferedDocs(int count) { @@ -370,12 +348,12 @@ assert allThreadsIdle(); if (infoStream != null) - message("closeDocStore: " + openFiles.size() + " files to flush to segment " + docStoreSegment + " numDocs=" + numDocsInStore); + message("closeDocStore: " + openFiles.size() + " files to flush to docStoreSegment " + docStoreSegment + " numDocs=" + numDocsInStore); boolean success = false; try { - initFlushState(true); + initFlushState(true, writer.directory); // doc stores always write to the primary directory closedFiles.clear(); consumer.closeDocStore(flushState); @@ -542,13 +520,13 @@ return true; } - synchronized private void initFlushState(boolean onlyDocStore) { + synchronized private void initFlushState(boolean onlyDocStore, Directory directory) { initSegmentName(onlyDocStore); flushState = new SegmentWriteState(this, directory, segment, docStoreSegment, numDocsInRAM, numDocsInStore, writer.getTermIndexInterval()); } /** Flush all pending docs to a new segment */ - synchronized int flush(boolean closeDocStore) throws IOException { + synchronized int flush(boolean closeDocStore, Directory flushToDir) throws IOException { assert allThreadsIdle(); @@ -557,13 +535,14 @@ assert nextDocID == numDocsInRAM; assert waitQueue.numWaiting == 0; assert waitQueue.waitingBytes == 0; + + assert flushToDir != null; + initFlushState(false, flushToDir); - initFlushState(false); - docStoreOffset = numDocsInStore; if (infoStream != null) - message("flush postings as segment " + flushState.segmentName + " numDocs=" + numDocsInRAM); + message("flush postings closeDocStore:"+closeDocStore+" as segment " + flushState.segmentName + " to "+writer.printDir(flushToDir)+" numDocs=" + numDocsInRAM); boolean success = false; @@ -582,7 +561,7 @@ consumer.flush(threads, flushState); if (infoStream != null) { - final long newSegmentSize = segmentSize(flushState.segmentName); + final long newSegmentSize = segmentSize(flushState.segmentName, flushToDir); String message = " oldRAMSize=" + numBytesUsed + " newFlushedSize=" + newSegmentSize + " docs/MB=" + nf.format(numDocsInRAM/(newSegmentSize/1024./1024.)) + @@ -608,8 +587,9 @@ } /** Build compound file for the segment we just flushed */ - void createCompoundFile(String segment) throws IOException { - + void createCompoundFile(String segment, Directory directory) throws IOException { + // compounds files should only be written to the primaryDir + assert directory == writer.directory; CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, segment + "." + IndexFileNames.COMPOUND_FILE_EXTENSION); Iterator it = flushState.flushedFiles.iterator(); while(it.hasNext()) @@ -922,11 +902,13 @@ int docStart = 0; boolean any = false; + Directory switchDir = writer.fileSwitchDirectory; + Directory directory = writer.directory; for (int i = 0; i < infosEnd; i++) { // Make sure we never attempt to apply deletes to // segment in external dir - assert infos.info(i).dir == directory; + assert switchDir.contains(infos.info(i).dir); SegmentReader reader = writer.readerPool.get(infos.info(i), false); try { @@ -1026,15 +1008,19 @@ synchronized private void addDeleteQuery(Query query, int docID) { deletesInRAM.queries.put(query, new Integer(flushedDocCount + docID)); } - + synchronized boolean doBalanceRAM() { - return ramBufferSize != IndexWriter.DISABLE_AUTO_FLUSH && !bufferIsFull && (numBytesUsed >= ramBufferSize || numBytesAlloc >= freeTrigger); + long ramBufAvail = writer.ramPolicy.getRAMBufferLimit(0.90); + long freeTrigger = (long)(ramBufAvail*1.05); + boolean numallocover = numBytesAlloc >= freeTrigger; + boolean numOver = numBytesUsed >= ramBufAvail; + return !bufferIsFull && (numOver || numallocover); } /** Does the synchronized work to finish/flush the * inverted document. */ private void finishDocument(DocumentsWriterThreadState perThread, DocWriter docWriter) throws IOException { - + // if we can't grow the ram buffer any more then we need to balance the ram if (doBalanceRAM()) // Must call this w/o holding synchronized(this) else // we'll hit deadlock: @@ -1119,7 +1105,7 @@ // TODO FI: this is not flexible -- we can't hardwire // extensions in here: - private long segmentSize(String segmentName) throws IOException { + private long segmentSize(String segmentName, Directory directory) throws IOException { // Used only when infoStream != null assert infoStream != null; @@ -1283,8 +1269,9 @@ void balanceRAM() { // We flush when we've used our target usage - final long flushTrigger = ramBufferSize; - + final long flushTrigger = writer.ramPolicy.getRAMBufferLimit(0.90); + long freeTrigger = (long)(flushTrigger * 1.05); + long freeLevel = (long) (0.95 * flushTrigger); if (numBytesAlloc > freeTrigger) { if (infoStream != null) Index: src/java/org/apache/lucene/index/IndexFileDeleter.java =================================================================== --- src/java/org/apache/lucene/index/IndexFileDeleter.java (revision 789908) +++ src/java/org/apache/lucene/index/IndexFileDeleter.java (working copy) @@ -18,6 +18,7 @@ */ import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FileSwitchDirectory; import java.io.IOException; import java.io.FileNotFoundException; @@ -509,12 +510,37 @@ deleteFile(fileName); } } - + + private Directory getDir(String name) throws IOException { + if (directory instanceof FileSwitchDirectory) { + FileSwitchDirectory fsd = (FileSwitchDirectory)directory; + Directory dir = fsd.getDirectory(name); + return dir; + } else { + return directory; + } + } + + String getDirName(String fileName) throws IOException { + return getDirName(getDir(fileName)); + } + + String getDirName(Directory dir) { + if (directory instanceof FileSwitchDirectory) { + FileSwitchDirectory fsd = (FileSwitchDirectory)directory; + if (dir == fsd.getPrimaryDir()) return "prim"; + if (dir == fsd.getSecondaryDir()) return "fsd"; + return "ext"; + } else { + return "prim"; + } + } + void deleteFile(String fileName) throws IOException { try { if (infoStream != null) { - message("delete \"" + fileName + "\""); + message("delete \"" + fileName + "\" in "+getDirName(fileName)); } directory.deleteFile(fileName); } catch (IOException e) { // if delete fails @@ -528,7 +554,7 @@ // the file for subsequent deletion. if (infoStream != null) { - message("IndexFileDeleter: unable to remove file \"" + fileName + "\": " + e.toString() + "; Will re-try later."); + message("IndexFileDeleter: unable to remove file \"" + fileName +"\": "+getDirName(fileName)+" \" "+ e.toString() + "; Will re-try later."); } if (deletable == null) { deletable = new ArrayList(); Index: src/java/org/apache/lucene/index/IndexWriter.java =================================================================== --- src/java/org/apache/lucene/index/IndexWriter.java (revision 789908) +++ src/java/org/apache/lucene/index/IndexWriter.java (working copy) @@ -20,6 +20,7 @@ import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.document.Document; import org.apache.lucene.index.DocumentsWriter.IndexingChain; +import org.apache.lucene.index.MergePolicy.OneMerge; import org.apache.lucene.search.Similarity; import org.apache.lucene.search.Query; import org.apache.lucene.store.Directory; @@ -28,6 +29,8 @@ import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.BufferedIndexInput; +import org.apache.lucene.store.RAMDirectory; +import org.apache.lucene.store.FileSwitchDirectory; import org.apache.lucene.util.Constants; import java.io.File; @@ -219,7 +222,9 @@ * @see #setDefaultWriteLockTimeout */ public static long WRITE_LOCK_TIMEOUT = 1000; - + + public static boolean GLOBALNRT = true; + private long writeLockTimeout = WRITE_LOCK_TIMEOUT; /** @@ -281,6 +286,18 @@ public final static int MAX_TERM_LENGTH = DocumentsWriter.MAX_TERM_LENGTH; /** + * File extensions for the FileSwitchDirectory used when NRT is on + */ + private final static Set SWITCH_FILE_EXTS = new HashSet(); + static { + SWITCH_FILE_EXTS.add("fdx"); + SWITCH_FILE_EXTS.add("fdt"); + SWITCH_FILE_EXTS.add("tvx"); + SWITCH_FILE_EXTS.add("tvf"); + SWITCH_FILE_EXTS.add("tvd"); + } + + /** * Default for {@link #getMaxSyncPauseSeconds}. On * Windows this defaults to 10.0 seconds; elsewhere it's * 0. @@ -307,7 +324,7 @@ private int messageID = -1; volatile private boolean hitOOM; - private Directory directory; // where this index resides + Directory directory; // where this index resides private Analyzer analyzer; // how to analyze text private Similarity similarity = Similarity.getDefault(); // how to normalize @@ -328,10 +345,10 @@ private SegmentInfos segmentInfos = new SegmentInfos(); // the segments - private DocumentsWriter docWriter; + DocumentsWriter docWriter; private IndexFileDeleter deleter; - private Set segmentsToOptimize = new HashSet(); // used by optimize to note those needing optimization + Set segmentsToOptimize = new HashSet(); // used by optimize to note those needing optimization private Lock writeLock; @@ -345,7 +362,11 @@ // merges private HashSet mergingSegments = new HashSet(); - private MergePolicy mergePolicy = new LogByteSizeMergePolicy(); + MergePolicy mergePolicy = new LogByteSizeMergePolicy("primary"); + MergePolicy ramMergePolicy = new LogDocMergePolicy("ram"); + NRTMergePolicy nrtMergePolicy; + RAMPolicy ramPolicy; + private long ramBufferSize = (long)(DEFAULT_RAM_BUFFER_SIZE_MB * 1024 * 1024); private MergeScheduler mergeScheduler = new ConcurrentMergeScheduler(); private LinkedList pendingMerges = new LinkedList(); private Set runningMerges = new HashSet(); @@ -363,6 +384,7 @@ private Thread writeThread; // non-null if any thread holds write lock final ReaderPool readerPool = new ReaderPool(); private int upgradeCount; + FileSwitchDirectory fileSwitchDirectory; // This is a "write once" variable (like the organic dye // on a DVD-R that may or may not be heated by a laser and @@ -375,6 +397,8 @@ // readers. private volatile boolean poolReaders; + private boolean flushToRAM; + /** * Expert: returns a readonly reader containing all * current updates. Flush is called automatically. This @@ -386,7 +410,7 @@ * guarantee on how quickly you can get a new reader after * making changes with IndexWriter. You'll have to * experiment in your situation to determine if it's - * faster enough. As this is a new and experimental + * fast enough. As this is a new and experimental * feature, please report back on your findings so we can * learn, improve and iterate.

* @@ -434,7 +458,9 @@ // this method is called: poolReaders = true; - flush(true, true, true); + // when getReader is called we assume we want to try to flush to ram + // (we may not be able to flush to ram as there may not be enough available ram) + flush(true, true, true, true); // Prevent segmentInfos from changing while opening the // reader; in theory we could do similar retry logic, @@ -444,6 +470,30 @@ } } + /** + * Ram directory (switchDirectory) segments reader + */ + // for testing + IndexReader getRAMReader() throws IOException { + flush(true, true, true, true); + + synchronized(this) { + return new ReadOnlyDirectoryReader(this, getFSDSegmentInfos()); + } + } + + /** + * Primary directory segments reader + */ + //for testing + IndexReader getPrimaryReader() throws IOException { + flush(true, true, true, true); + + synchronized(this) { + return new ReadOnlyDirectoryReader(this, getInfosByDir(segmentInfos, directory)); + } + } + /** Holds shared SegmentReader instances. IndexWriter uses * SegmentReaders for 1) applying deletes, 2) doing * merges, 3) handing out a real-time reader. This pool @@ -598,7 +648,7 @@ sr.decRef(); } } - + // Returns a ref public synchronized SegmentReader get(SegmentInfo info, boolean doOpenStores) throws IOException { return get(info, doOpenStores, BufferedIndexInput.BUFFER_SIZE); @@ -681,7 +731,14 @@ assert readCount >= 0; notifyAll(); } - + + boolean isRAMMerge(MergePolicy.OneMerge merge) { + if (flushToRAM) { + if (merge.directory == fileSwitchDirectory) return true; + return false; + } else return false; + } + synchronized final boolean isOpen(boolean includePendingClose) { return !(closed || (includePendingClose && closing)); } @@ -726,11 +783,27 @@ * an exception if the mergePolicy is not a LogMergePolicy. */ private LogMergePolicy getLogMergePolicy() { - if (mergePolicy instanceof LogMergePolicy) - return (LogMergePolicy) mergePolicy; + MergePolicy mp = mergePolicy; + if (mp instanceof LogMergePolicy) + return (LogMergePolicy) mp; else throw new IllegalArgumentException("this method can only be called when the merge policy is the default LogMergePolicy"); } + + public void setRAMPolicy(RAMPolicy ramPolicy) { + ensureOpen(); + this.ramPolicy = ramPolicy; + } + + public RAMPolicy getRAMPolicy() { + ensureOpen(); + return ramPolicy; + } + + public MergePolicy getRAMMergePolicy() { + ensureOpen(); + return ramMergePolicy; + } /**

Get the current setting of whether newly flushed * segments will use the compound file format. Note that @@ -857,6 +930,38 @@ * Constructs an IndexWriter for the index in path. * Text will be analyzed with a. If create * is true, then a new, empty index will be created in + * path, replacing the index already there, + * if any. + * + *

NOTE: autoCommit (see above) is set to false with this + * constructor. + * + * @param path the path to the index directory + * @param a the analyzer to use + * @param create true to create the index or overwrite + * the existing one; false to append to the existing + * index + * @param mfl Maximum field length in number of tokens/terms: LIMITED, UNLIMITED, or user-specified + * via the MaxFieldLength constructor. + * @throws CorruptIndexException if the index is corrupt + * @throws LockObtainFailedException if another writer + * has this index open (write.lock could not + * be obtained) + * @throws IOException if the directory cannot be read/written to, or + * if it does not exist and create is + * false or if there is any other low-level + * IO error + */ + public IndexWriter(String path, Analyzer a, boolean create, MaxFieldLength mfl, boolean flushToRam) + throws CorruptIndexException, LockObtainFailedException, IOException { + init(FSDirectory.getDirectory(path), a, create, true, null, false, mfl.getLimit(), null, null); + } + + /** + * Constructs an IndexWriter for the index in path. + * Text will be analyzed with a. If create + * is true, then a new, empty index will be created in * path, replacing the index already there, if any. * * @param path the path to the index directory @@ -981,11 +1086,18 @@ * is true, then a new, empty index will be created in * d, replacing the index already there, if any. * + *

NOTE: autoCommit (see above) is set to false with this + * constructor. + * * @param d the index directory * @param a the analyzer to use * @param create true to create the index or overwrite * the existing one; false to append to the existing * index + * @param mfl Maximum field length in number of terms/tokens: LIMITED, UNLIMITED, or user-specified + * via the MaxFieldLength constructor. + * @param flushToRam If many segments are being created opened for reading (@see #getReader) set this to true * @throws CorruptIndexException if the index is corrupt * @throws LockObtainFailedException if another writer * has this index open (write.lock could not @@ -994,6 +1106,31 @@ * if it does not exist and create is * false or if there is any other low-level * IO error + */ + public IndexWriter(Directory d, Analyzer a, boolean create, MaxFieldLength mfl, boolean flushToRam) + throws CorruptIndexException, LockObtainFailedException, IOException { + init(d, a, create, false, null, false, mfl.getLimit(), null, null, flushToRam, null); + } + + /** + * Constructs an IndexWriter for the index in d. + * Text will be analyzed with a. If create + * is true, then a new, empty index will be created in + * d, replacing the index already there, if any. + * + * @param d the index directory + * @param a the analyzer to use + * @param create true to create the index or overwrite + * the existing one; false to append to the existing + * index + * @throws CorruptIndexException if the index is corrupt + * @throws LockObtainFailedException if another writer + * has this index open (write.lock could not + * be obtained) + * @throws IOException if the directory cannot be read/written to, or + * if it does not exist and create is + * false or if there is any other low-level + * IO error * @deprecated This constructor will be removed in the 3.0 * release, and call {@link #commit()} when needed. * Use {@link #IndexWriter(Directory,Analyzer,boolean,MaxFieldLength)} instead. @@ -1141,8 +1278,15 @@ * already exist. Text will be analyzed with * a. * + *

NOTE: autoCommit (see above) is set to false with this + * constructor. + * * @param d the index directory * @param a the analyzer to use + * @param mfl Maximum field length in number of terms/tokens: LIMITED, UNLIMITED, or user-specified + * via the MaxFieldLength constructor. + * @param flushToRam If many segments are being created opened for reading (@see #getReader) set this to true * @throws CorruptIndexException if the index is corrupt * @throws LockObtainFailedException if another writer * has this index open (write.lock could not @@ -1150,6 +1294,37 @@ * @throws IOException if the directory cannot be * read/written to or if there is any other low-level * IO error + */ + public IndexWriter(Directory d, Analyzer a, MaxFieldLength mfl, boolean flushToRam) + throws CorruptIndexException, LockObtainFailedException, IOException { + init(d, a, false, null, false, mfl.getLimit(), null, null, flushToRam); + } + + IndexWriter(Directory d, Analyzer a, MaxFieldLength mfl, boolean flushToRam, Directory ramDir) + throws CorruptIndexException, LockObtainFailedException, IOException { + init(d, a, false, null, false, mfl.getLimit(), null, null, flushToRam, ramDir); + } + + IndexWriter(Directory d, Analyzer a, MaxFieldLength mfl, boolean flushToRam, Directory ramDir, IndexDeletionPolicy deletionPolicy) + throws CorruptIndexException, LockObtainFailedException, IOException { + init(d, a, false, deletionPolicy, false, mfl.getLimit(), null, null, flushToRam, ramDir); + } + + /** + * Constructs an IndexWriter for the index in + * d, first creating it if it does not + * already exist. Text will be analyzed with + * a. + * + * @param d the index directory + * @param a the analyzer to use + * @throws CorruptIndexException if the index is corrupt + * @throws LockObtainFailedException if another writer + * has this index open (write.lock could not + * be obtained) + * @throws IOException if the directory cannot be + * read/written to or if there is any other low-level + * IO error * @deprecated This constructor will be removed in the 3.0 release. * Use {@link * #IndexWriter(Directory,Analyzer,MaxFieldLength)} @@ -1415,21 +1590,43 @@ } private void init(Directory d, Analyzer a, boolean closeDir, IndexDeletionPolicy deletionPolicy, - boolean autoCommit, int maxFieldLength, IndexingChain indexingChain, IndexCommit commit) + boolean autoCommit, int maxFieldLength, IndexingChain indexingChain, IndexCommit commit) throws CorruptIndexException, LockObtainFailedException, IOException { + init(d, a, closeDir, deletionPolicy, autoCommit, maxFieldLength, indexingChain, commit, false); + } + + private void init(Directory d, Analyzer a, boolean closeDir, IndexDeletionPolicy deletionPolicy, + boolean autoCommit, int maxFieldLength, IndexingChain indexingChain, IndexCommit commit, boolean flushToRam) + throws CorruptIndexException, LockObtainFailedException, IOException { + init(d, a, closeDir, deletionPolicy, autoCommit, maxFieldLength, indexingChain, commit, flushToRam, (Directory)null); + } + + private void init(Directory d, Analyzer a, boolean closeDir, IndexDeletionPolicy deletionPolicy, + boolean autoCommit, int maxFieldLength, IndexingChain indexingChain, IndexCommit commit, boolean flushToRam, Directory ramDir) + throws CorruptIndexException, LockObtainFailedException, IOException { if (IndexReader.indexExists(d)) { - init(d, a, false, closeDir, deletionPolicy, autoCommit, maxFieldLength, indexingChain, commit); + init(d, a, false, closeDir, deletionPolicy, autoCommit, maxFieldLength, indexingChain, commit, flushToRam, ramDir); } else { - init(d, a, true, closeDir, deletionPolicy, autoCommit, maxFieldLength, indexingChain, commit); + init(d, a, true, closeDir, deletionPolicy, autoCommit, maxFieldLength, indexingChain, commit, flushToRam, ramDir); } } + + private void init(Directory d, Analyzer a, final boolean create, boolean closeDir, + IndexDeletionPolicy deletionPolicy, boolean autoCommit, int maxFieldLength, + IndexingChain indexingChain, IndexCommit commit) throws CorruptIndexException, LockObtainFailedException, IOException { + init(d, a, create, closeDir, deletionPolicy, autoCommit, maxFieldLength, indexingChain, commit, false, (Directory)null); + } private void init(Directory d, Analyzer a, final boolean create, boolean closeDir, IndexDeletionPolicy deletionPolicy, boolean autoCommit, int maxFieldLength, - IndexingChain indexingChain, IndexCommit commit) + IndexingChain indexingChain, IndexCommit commit, boolean flushToRAM, Directory ramDir) throws CorruptIndexException, LockObtainFailedException, IOException { this.closeDir = closeDir; directory = d; + if (GLOBALNRT) flushToRAM = true; + this.flushToRAM = flushToRAM; + if (ramDir == null) ramDir = new RAMDirectory(); + this.fileSwitchDirectory = new FileSwitchDirectory(SWITCH_FILE_EXTS, directory, ramDir, false); analyzer = a; setMessageID(defaultInfoStream); this.maxFieldLength = maxFieldLength; @@ -1501,13 +1698,25 @@ this.autoCommit = autoCommit; setRollbackSegmentInfos(segmentInfos); - docWriter = new DocumentsWriter(directory, this, indexingChain); + docWriter = new DocumentsWriter(this, indexingChain); docWriter.setInfoStream(infoStream); docWriter.setMaxFieldLength(maxFieldLength); - + ramPolicy = new RAMPolicy(this); + if (mergePolicy instanceof LogMergePolicy) { + ((LogMergePolicy)mergePolicy).setDirectory(directory); + } + if (ramMergePolicy instanceof LogMergePolicy) { + ((LogMergePolicy)ramMergePolicy).setDirectory(fileSwitchDirectory); + } + if (flushToRAM) { + nrtMergePolicy = new NRTMergePolicy(this); + ((LogMergePolicy)ramMergePolicy).setUseCompoundDocStore(false); + ((LogMergePolicy)ramMergePolicy).setUseCompoundFile(false); + setRAMBufferSizeMB(IndexWriter.DEFAULT_RAM_BUFFER_SIZE_MB); // initialize ram dir size + } // Default deleter (for backwards compatibility) is // KeepOnlyLastCommitDeleter: - deleter = new IndexFileDeleter(directory, + deleter = new IndexFileDeleter(fileSwitchDirectory, deletionPolicy == null ? new KeepOnlyLastCommitDeletionPolicy() : deletionPolicy, segmentInfos, infoStream, docWriter); @@ -1531,17 +1740,67 @@ throw e; } } - + + Directory getFileSwitchDirectory() { + return fileSwitchDirectory; + } + + synchronized long getRAMDirSize() { + return ((RAMDirectory)fileSwitchDirectory.getSecondaryDir()).sizeInBytes(); + } + + synchronized long size(Directory dir) throws IOException { + String[] files = dir.listAll(); + long c = 0; + for (int x=0; x < files.length; x++) { + c +=dir.fileLength(files[x]); + } + return c; + } + + /** + * Returns the RAMDirectory created internally and used + * by the FileSwitchDirectory. + * @return + */ + public Directory getRAMDirectory() { + return fileSwitchDirectory.getSecondaryDir(); + } + private synchronized void setRollbackSegmentInfos(SegmentInfos infos) { rollbackSegmentInfos = (SegmentInfos) infos.clone(); - assert !rollbackSegmentInfos.hasExternalSegments(directory); + assert !rollbackSegmentInfos.hasExternalSegments(fileSwitchDirectory); rollbackSegments = new HashMap(); final int size = rollbackSegmentInfos.size(); for(int i=0;iDetermines the largest segment (measured by * document count) that may be merged with other segments. * Small values (e.g., less than 10,000) are best for @@ -1754,7 +2015,15 @@ if (mb == DISABLE_AUTO_FLUSH && getMaxBufferedDocs() == DISABLE_AUTO_FLUSH) throw new IllegalArgumentException( "at least one of ramBufferSize and maxBufferedDocs must be enabled"); - docWriter.setRAMBufferSizeMB(mb); + long value = (long)(mb*(double)1024*(double)1024); + if (flushToRAM) { + try { + ramPolicy.setTotalMax(value); + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } + ramBufferSize = value; if (infoStream != null) message("setRAMBufferSizeMB " + mb); } @@ -1763,7 +2032,7 @@ * Returns the value set by {@link #setRAMBufferSizeMB} if enabled. */ public double getRAMBufferSizeMB() { - return docWriter.getRAMBufferSizeMB(); + return ramBufferSize / 1024 / 1024; } /** @@ -1891,10 +2160,11 @@ private void messageState() { message("setInfoStream: dir=" + directory + + " fsd="+fileSwitchDirectory+ " autoCommit=" + autoCommit + " mergePolicy=" + mergePolicy + " mergeScheduler=" + mergeScheduler + - " ramBufferSizeMB=" + docWriter.getRAMBufferSizeMB() + + " ramBufferSizeMB=" + getRAMBufferSizeMB() + " maxBufferedDocs=" + docWriter.getMaxBufferedDocs() + " maxBuffereDeleteTerms=" + docWriter.getMaxBufferedDeleteTerms() + " maxFieldLength=" + maxFieldLength + @@ -2068,13 +2338,14 @@ if (!hitOOM) { flush(waitForMerges, true, true); } - + resolveRAMSegments(); if (waitForMerges) // Give merge scheduler last chance to run, in case // any pending merges are waiting: - mergeScheduler.merge(this); + scheduleMerge(); mergePolicy.close(); + if (ramMergePolicy != null) ramMergePolicy.close(); finishMerges(waitForMerges); stopMerges = true; @@ -2097,8 +2368,11 @@ deleter.close(); } - if (closeDir) + if (closeDir) { directory.close(); + Directory ramDir = fileSwitchDirectory.getSecondaryDir(); + ramDir.close(); + } if (writeLock != null) { writeLock.release(); // release write lock @@ -2143,8 +2417,7 @@ } } - useCompoundDocStore = mergePolicy.useCompoundDocStore(segmentInfos); - + useCompoundDocStore = getActiveMergePolicy().useCompoundDocStore(segmentInfos, directory); if (useCompoundDocStore && docStoreSegment != null && docWriter.closedFiles().size() != 0) { // Now build compound doc store file @@ -2528,6 +2801,22 @@ handleOOM(oom, "updateDocument"); } } + + //for test purpose + final synchronized SegmentInfos getSegmentInfos() { + return segmentInfos; + } + + final synchronized SegmentInfos getSegmentInfos(Directory dir) { + SegmentInfos dirInfos = new SegmentInfos(); + for (int x=0; x < segmentInfos.size(); x++) { + SegmentInfo info = segmentInfos.info(x); + if (info.dir == dir) { + dirInfos.add(info); + } + } + return dirInfos; + } // for test purpose final synchronized int getSegmentCount(){ @@ -2687,7 +2976,7 @@ if (infoStream != null) message("optimize: index now " + segString()); - flush(true, false, true); + flush(true, false, true, true); // if there's room, flush the buffer to the ramdir synchronized(this) { resetMergeExceptions(); @@ -2730,7 +3019,7 @@ for(int i=0;i 0) { + MergePolicy.OneMerge allRamInfosMerge = new MergePolicy.OneMerge(ramInfos, ramInfos, getUseCompoundFile(), directory, this); + spec = new MergePolicy.MergeSpecification(this); + spec.merges.add(allRamInfosMerge); + } + done = true; + if (spec == null) continue; + for(int i=0; i < spec.merges.size(); i++) { + final MergePolicy.OneMerge newMerge = (MergePolicy.OneMerge)spec.merges.get(i); + done = false; + //newMerge.directory = directory; + // Returns true if no running merge conflicts + // with this one (and, records this merge as + // pending), ie, this segment is not currently + // being merged: + if (registerMerge(newMerge)) { + merge = newMerge; + + // If this segment is not currently being + // merged, then advance it to running & run + // the merge ourself (below): + pendingMerges.remove(merge); + runningMerges.add(merge); + break; + } + } + + if (!done && merge == null) + // We are not yet done (FSD segments still + // exist in segmentInfos), yet, all such segments + // are currently "covered" by a pending or running + // merge. We now try to grab any pending merge + // that involves FSD segments: + + // we get merges that have ram segments, or are going to ram + // and merge all of them until they're gone + // TODO: ideally we could abort ram -> ram merges however + // they should be fast so it shouldn't matter + merge = getNextMerge(fileSwitchDirectory, fileSwitchDirectory); + + if (!done && merge == null) + // We are not yet done, and, all external segments + // fall under merges that the merge scheduler is + // currently running. So, we now wait and check + // back to see if the merge has completed. + doWait(); + } + + if (merge != null) { + any = true; + merge(merge); + } + } + + if (any) + // Sometimes, on copying an external segment over, + // more merges may become necessary: + scheduleMerge(); + } + /* If any of our segments are using a directory != ours * then we have to either copy them over one by one, merge * them (if merge policy has chosen to) or wait until @@ -3641,10 +4117,10 @@ done = true; for(int i=0;iabove for details.

* - * @deprecated please call {@link #commit()}) instead - * * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ @@ -3949,8 +4423,11 @@ if (infoStream != null) message("prepareCommit: flush"); - flush(true, true, true); - + flush(true, true, true, false); + // merge ram segments to disk synchronously + // also waits for all ram merges to complete + resolveRAMSegments(); + checkpoint(); startCommit(0, commitUserData); } @@ -4025,7 +4502,7 @@ try { if (infoStream != null) message("commit: pendingCommit != null"); - pendingCommit.finishCommit(directory); + pendingCommit.finishCommit(fileSwitchDirectory); if (infoStream != null) message("commit: wrote segments file \"" + pendingCommit.getCurrentSegmentFileName() + "\""); lastCommitChangeCount = pendingCommitChangeCount; @@ -4041,13 +4518,41 @@ } else if (infoStream != null) message("commit: pendingCommit == null; skip"); - if (infoStream != null) - message("commit: done"); + message("commit: done ramDirSize:"+size(getRAMDirectory())); } + public static SegmentInfos getInfosByDir(SegmentInfos infos, Directory dir) throws IOException { + SegmentInfos dirInfos = new SegmentInfos(); + for (int x=0; x < infos.size(); x++) { + SegmentInfo info = infos.info(x); + if (info.dir == dir) { + dirInfos.add(info); + } + } + return dirInfos; + } + + synchronized SegmentInfos getFSDSegmentInfos() throws IOException { + return getInfosByDir(segmentInfos, fileSwitchDirectory); + } + /** - * Flush all in-memory buffered udpates (adds and deletes) + * Flush all in-memory buffered updates (adds and deletes) + * to the Directory. By default we flush to the ram dir + * @param triggerMerge if true, we may merge segments (if + * deletes or docs were flushed) if necessary + * @param flushDocStores if false we are allowed to keep + * doc stores open to share with the next segment + * @param flushDeletes whether pending deletes should also + * be flushed + */ + protected final void flush(boolean triggerMerge, boolean flushDocStores, boolean flushDeletes) throws CorruptIndexException, IOException { + flush(triggerMerge, flushDocStores, flushDeletes, (flushToRAM ? true : false)); + } + + /** + * Flush all in-memory buffered updates (adds and deletes) * to the Directory. * @param triggerMerge if true, we may merge segments (if * deletes or docs were flushed) if necessary @@ -4056,28 +4561,33 @@ * @param flushDeletes whether pending deletes should also * be flushed */ - protected final void flush(boolean triggerMerge, boolean flushDocStores, boolean flushDeletes) throws CorruptIndexException, IOException { - // We can be called during close, when closing==true, so we must pass false to ensureOpen: + protected final void flush(boolean triggerMerge, boolean flushDocStores, boolean flushDeletes, boolean allowFlushToRAM) throws CorruptIndexException, IOException { ensureOpen(false); - if (doFlush(flushDocStores, flushDeletes) && triggerMerge) + + if (doFlush(flushDocStores, flushDeletes, allowFlushToRAM) && triggerMerge) { + message("flush maybe merge"); maybeMerge(); + } } - // TODO: this method should not have to be entirely - // synchronized, ie, merges should be allowed to commit - // even while a flush is happening - private synchronized final boolean doFlush(boolean flushDocStores, boolean flushDeletes) throws CorruptIndexException, IOException { + private synchronized final boolean doFlush(boolean flushDocStores, boolean flushDeletes, boolean allowFlushToRAM) throws CorruptIndexException, IOException { try { - return doFlushInternal(flushDocStores, flushDeletes); + return doFlushInternal(flushDocStores, flushDeletes, allowFlushToRAM); } finally { docWriter.clearFlushPending(); } } + // TODO: this method should not have to be entirely + // synchronized, ie, merges should be allowed to commit + // even while a flush is happening + private synchronized final boolean doFlush(boolean flushDocStores, boolean flushDeletes) throws CorruptIndexException, IOException { + return doFlush(flushDocStores, flushDeletes, (flushToRAM ? true : false)); + } // TODO: this method should not have to be entirely // synchronized, ie, merges should be allowed to commit // even while a flush is happening - private synchronized final boolean doFlushInternal(boolean flushDocStores, boolean flushDeletes) throws CorruptIndexException, IOException { + private synchronized final boolean doFlushInternal(boolean flushDocStores, boolean flushDeletes, boolean allowFlushToRAM) throws CorruptIndexException, IOException { if (hitOOM) { throw new IllegalStateException("this writer hit an OutOfMemoryError; cannot flush"); @@ -4160,6 +4670,10 @@ // If we are flushing docs, segment must not be null: assert segment != null || !flushDocs; + Directory flushToDir = directory; + if (allowFlushToRAM && flushToRAM) { + flushToDir = fileSwitchDirectory; + } if (flushDocs) { @@ -4167,7 +4681,7 @@ final int flushedDocCount; try { - flushedDocCount = docWriter.flush(flushDocStores); + flushedDocCount = docWriter.flush(flushDocStores, flushToDir); success = true; } finally { if (!success) { @@ -4193,7 +4707,7 @@ // successfully. newSegment = new SegmentInfo(segment, flushedDocCount, - directory, false, true, + flushToDir, false, true, docStoreOffset, docStoreSegment, docStoreIsCompoundFile, docWriter.hasProx()); @@ -4207,11 +4721,11 @@ checkpoint(); } - if (flushDocs && mergePolicy.useCompoundFile(segmentInfos, newSegment)) { + if (flushDocs && getActiveMergePolicy().useCompoundFile(segmentInfos, newSegment)) { // Now build compound file boolean success = false; try { - docWriter.createCompoundFile(segment); + docWriter.createCompoundFile(segment, directory); success = true; } finally { if (!success) { @@ -4251,7 +4765,7 @@ */ public final long ramSizeInBytes() { ensureOpen(); - return docWriter.getRAMUsed(); + return docWriter.getRAMUsed() + ramPolicy.ramDirSize; } /** Expert: Return the number of documents currently @@ -4260,25 +4774,53 @@ ensureOpen(); return docWriter.getNumDocsInRAM(); } + + int ensureContiguousMerge(SegmentInfos segments) { + if (segments.size() == 0) return 0; + int sisSize = segmentInfos.size(); + + SegmentInfo in = segments.info(0); + + int first = segmentInfos.indexOf(in); + if (first == -1) + throw new MergePolicy.MergeException("could not find segment " + segments.info(0).name + " in current index " + segString(), fileSwitchDirectory); - private int ensureContiguousMerge(MergePolicy.OneMerge merge) { + final int numSegments = segmentInfos.size(); + + final int numSegmentsToMerge = segments.size(); + for(int i=0;i= numSegments || !segmentInfos.info(first+i).equals(info)) { + if (segmentInfos.indexOf(info) == -1) + throw new MergePolicy.MergeException("MergePolicy selected a segment (" + info.name + ") that is not in the current index " + segString(), fileSwitchDirectory); + else + throw new MergePolicy.MergeException("MergePolicy selected non-contiguous segments to merge (" + segments + " vs " + segString() + "), which IndexWriter (currently) cannot handle", + fileSwitchDirectory); + } + } + + return first; + } + + int ensureContiguousMerge(MergePolicy.OneMerge merge) { + + int first = merge.fromInfos.indexOf(merge.segments.info(0)); if (first == -1) - throw new MergePolicy.MergeException("could not find segment " + merge.segments.info(0).name + " in current index " + segString(), directory); + throw new MergePolicy.MergeException("could not find segment " + merge.segments.info(0).name + " in current index " + segString(), fileSwitchDirectory); - final int numSegments = segmentInfos.size(); + final int numSegments = merge.fromInfos.size(); final int numSegmentsToMerge = merge.segments.size(); for(int i=0;i= numSegments || !segmentInfos.info(first+i).equals(info)) { - if (segmentInfos.indexOf(info) == -1) - throw new MergePolicy.MergeException("MergePolicy selected a segment (" + info.name + ") that is not in the current index " + segString(), directory); + if (first + i >= numSegments || !merge.fromInfos.info(first+i).equals(info)) { + if (merge.fromInfos.indexOf(info) == -1) + throw new MergePolicy.MergeException("MergePolicy selected a segment (" + info.name + ") that is not in the current index " + segString(), fileSwitchDirectory); else - throw new MergePolicy.MergeException("MergePolicy selected non-contiguous segments to merge (" + merge.segString(directory) + " vs " + segString() + "), which IndexWriter (currently) cannot handle", - directory); + throw new MergePolicy.MergeException("MergePolicy selected non-contiguous segments to merge (" + merge.segString() + " vs " + segString() + "), which IndexWriter (currently) cannot handle", + fileSwitchDirectory); } } @@ -4301,7 +4843,7 @@ final SegmentInfos sourceSegments = merge.segments; if (infoStream != null) - message("commitMergeDeletes " + merge.segString(directory)); + message("commitMergeDeletes " + merge.segString()); // Carefully merge deletes that occurred after we // started merging: @@ -4370,7 +4912,7 @@ } if (infoStream != null) - message("commitMerge: " + merge.segString(directory) + " index=" + segString()); + message("commitMerge: " + merge.segString() + " index=" + segString()); assert merge.registerDone; @@ -4382,7 +4924,7 @@ // abort this merge if (merge.isAborted()) { if (infoStream != null) - message("commitMerge: skipping merge " + merge.segString(directory) + ": it was aborted"); + message("commitMerge: skipping merge " + merge.segString() + ": it was aborted"); deleter.refresh(merge.info.name); return false; @@ -4411,12 +4953,20 @@ } } } - + SegmentInfo firstInfo = merge.segments.info(0); + int idx = segmentInfos.indexOf(firstInfo); merge.info.setHasProx(merger.hasProx()); - - segmentInfos.subList(start, start + merge.segments.size()).clear(); + + // remove the newly merged infos individually + for (int x=0; x < merge.segments.size(); x++) { + SegmentInfo info = merge.segments.info(x); + segmentInfos.remove(info); + } + //segmentInfos.subList(start, start + merge.segments.size()).clear(); assert !segmentInfos.contains(merge.info); - segmentInfos.add(start, merge.info); + + segmentInfos.add(idx, merge.info); + //segmentInfos.add(start, merge.info); // Must note the change to segmentInfos so any commits // in-flight don't lose it: @@ -4440,7 +4990,7 @@ final private void handleMergeException(Throwable t, MergePolicy.OneMerge merge) throws IOException { if (infoStream != null) { - message("handleMergeException: merge=" + merge.segString(directory) + " exc=" + t); + message("handleMergeException: merge=" + merge.segString() + " exc=" + t); } // Set the exception on the merge, so if @@ -4485,7 +5035,7 @@ mergeInit(merge); if (infoStream != null) - message("now merge\n merge=" + merge.segString(directory) + "\n merge=" + merge + "\n index=" + segString()); + message("now merge\n merge=" + merge.segString() + "\n merge=" + merge + "\n index=" + segString()); mergeMiddle(merge); mergeSuccess(merge); @@ -4533,19 +5083,26 @@ if (stopMerges) { merge.abort(); - throw new MergePolicy.MergeAbortedException("merge is aborted: " + merge.segString(directory)); + throw new MergePolicy.MergeAbortedException("merge is aborted: " + merge.segString()); } - + assert merge.directory != null; final int count = merge.segments.size(); boolean isExternal = false; + // all the segment directories should be the same + Directory segDir = null; for(int i=0;iThis class implements a {@link MergePolicy} that tries * to merge segments into levels of exponentially * increasing size, where each level has fewer segments than @@ -63,17 +65,28 @@ /* TODO 3.0: change this default to true */ protected boolean calibrateSizeByDeletes = false; - private boolean useCompoundFile = true; - private boolean useCompoundDocStore = true; - private IndexWriter writer; - + protected boolean useCompoundFile = true; + protected boolean useCompoundDocStore = true; + protected IndexWriter writer; + protected String type = ""; + protected Directory directory; + + public void setDirectory(Directory directory) { + this.directory = directory; + } + + public void setType(String type) { + this.type = type; + } + protected boolean verbose() { return writer != null && writer.verbose(); } - private void message(String message) { - if (verbose()) - writer.message("LMP: " + message); + protected void message(String message) { + if (verbose()) { + writer.message("LMP("+type+"): " + message); + } } /**

Returns the number of segments that are merged at @@ -116,6 +129,11 @@ return useCompoundFile; } + //Javadoc inherited + public boolean useCompoundDocStore(SegmentInfos infos, Directory dir) { + return useCompoundDocStore; + } + // Javadoc inherited public boolean useCompoundDocStore(SegmentInfos infos) { return useCompoundDocStore; @@ -226,12 +244,12 @@ if (last > 0) { - spec = new MergeSpecification(); + spec = new MergeSpecification(writer); // First, enroll all "full" merges (size // mergeFactor) to potentially be run concurrently: while (last - maxNumSegments + 1 >= mergeFactor) { - spec.add(new OneMerge(infos.range(last-mergeFactor, last), useCompoundFile)); + spec.add(new OneMerge(infos.range(last-mergeFactor, last), infos, useCompoundFile, directory, writer)); last -= mergeFactor; } @@ -243,7 +261,7 @@ // Since we must optimize down to 1 segment, the // choice is simple: if (last > 1 || !isOptimized(writer, infos.info(0))) - spec.add(new OneMerge(infos.range(0, last), useCompoundFile)); + spec.add(new OneMerge(infos.range(0, last), infos, useCompoundFile, directory, writer)); } else if (last > maxNumSegments) { // Take care to pick a partial merge that is @@ -271,7 +289,7 @@ } } - spec.add(new OneMerge(infos.range(bestStart, bestStart+finalMergeSize), useCompoundFile)); + spec.add(new OneMerge(infos.range(bestStart, bestStart+finalMergeSize), infos, useCompoundFile, directory, writer)); } } @@ -299,7 +317,7 @@ if (verbose()) message("findMergesToExpungeDeletes: " + numSegments + " segments"); - MergeSpecification spec = new MergeSpecification(); + MergeSpecification spec = new MergeSpecification(writer); int firstSegmentWithDeletions = -1; for(int i=0;i= maxMergeSize || sizeDocs(info) >= maxMergeDocs); + //rightDir |= info.dir == directory; } - if (!anyTooLarge) { + if (!anyTooLarge) { //&& rightDir) { if (spec == null) - spec = new MergeSpecification(); + spec = new MergeSpecification(writer); if (verbose()) message(" " + start + " to " + end + ": add this merge"); - spec.add(new OneMerge(infos.range(start, end), useCompoundFile)); + spec.add(new OneMerge(infos.range(start, end), infos, useCompoundFile, directory, writer)); } else if (verbose()) - message(" " + start + " to " + end + ": contains segment over maxMergeSize or maxMergeDocs; skipping"); + message(" " + start + " to " + end + ": contains segment over maxMergeSize or maxMergeDocs or not right dir; skipping"); start = end; end = start + mergeFactor; Index: src/java/org/apache/lucene/index/MergeDocIDRemapper.java =================================================================== --- src/java/org/apache/lucene/index/MergeDocIDRemapper.java (revision 789908) +++ src/java/org/apache/lucene/index/MergeDocIDRemapper.java (working copy) @@ -45,7 +45,8 @@ int numDocs = 0; for(int j=0;j 0) b.append(' '); - b.append(segments.info(i).segString(dir)); + b.append(segments.info(i).segString(directory, writer)); } - if (info != null) - b.append(" into ").append(info.name); + if (info != null) { + b.append(" into ").append(info.segString(info.dir, writer)); + } if (optimize) b.append(" [optimize]"); if (mergeDocStores) { @@ -147,25 +155,43 @@ */ public static class MergeSpecification { - + IndexWriter writer; /** * The subset of segments to be included in the primitive merge. */ public List merges = new ArrayList(); + public MergeSpecification(IndexWriter writer) { + this.writer = writer; + } + public void add(OneMerge merge) { merges.add(merge); } - public String segString(Directory dir) { + /** + * @deprecated please call {@link #segString()}) instead as merge has the directory + * @param directory + * @return + */ + public String segString(Directory directory) { StringBuffer b = new StringBuffer(); b.append("MergeSpec:\n"); final int count = merges.size(); for(int i=0;iExpert: {@link IndexWriter} uses an instance * implementing this interface to execute the merges * selected by a {@link MergePolicy}. The default Index: src/java/org/apache/lucene/index/NRTMergePolicy.java =================================================================== --- src/java/org/apache/lucene/index/NRTMergePolicy.java (revision 0) +++ src/java/org/apache/lucene/index/NRTMergePolicy.java (revision 0) @@ -0,0 +1,240 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import org.apache.lucene.store.Directory; + +/** + * Merge policy that takes into account the regular IndexWriter merge policy + * (that operates on the primary dir) and the ram merge policy which determines + * merges for the ram directory. + * + * Optimize merges all segments (ram and primary dir) to the primary dir. + * + * Expunge deletes removes deletes from the ram dir and the primary dir. + * + * If the ramDir is over it's limit, or a new merge would put the ramDir + * over it's limit, all ram segments are merged into a single primaryDir + * segment. + * + */ +public class NRTMergePolicy extends MergePolicy { + private static final int FIND_MERGES = 1; + private static final int FIND_MERGES_EXPUNGE_DELETES = 2; + private static final int FIND_MERGES_OPTIMIZE = 3; + private IndexWriter writer; + + public NRTMergePolicy(IndexWriter writer) { + this.writer = writer; + } + + public boolean useCompoundDocStore(SegmentInfos segments) { + throw new UnsupportedOperationException(""); + } + + /** + * If the newSegment is going to ram, get useCompoundFile from the + * ramMergePolicy otherwise use the mergePolicy + */ + public boolean useCompoundFile(SegmentInfos segments, SegmentInfo newSegment) { + assert newSegment.dir != null; + if (newSegment.dir == writer.directory) { + return writer.mergePolicy.useCompoundFile(segments, newSegment); + } else if (newSegment.dir == writer.fileSwitchDirectory) { + return writer.ramMergePolicy.useCompoundFile(segments, newSegment); + } + throw new RuntimeException("unknown directory"); + } + + public boolean useCompoundDocStore(SegmentInfos segments, Directory dir) { + assert dir != null; + if (dir == writer.directory) { + return writer.mergePolicy.useCompoundDocStore(segments); + } else { + return writer.ramMergePolicy.useCompoundDocStore(segments); + } + } + + public void close() { + } + + public MergeSpecification findMergesForOptimize(SegmentInfos segmentInfos, IndexWriter writer, int maxSegmentCount, Set segmentsToOptimize) + throws CorruptIndexException, IOException { + return findMerges(segmentInfos, writer, FIND_MERGES_OPTIMIZE, maxSegmentCount); + } + + public MergeSpecification findMerges(SegmentInfos segmentInfos, IndexWriter writer) throws CorruptIndexException, IOException { + return findMerges(segmentInfos, writer, FIND_MERGES, 0); + } + + private static boolean mergesCompound(MergeSpecification spec) { + if (spec.merges.size() == 0) + return false; + for (int x = 0; x < spec.merges.size(); x++) { + MergePolicy.OneMerge merge = (MergePolicy.OneMerge) spec.merges.get(x); + if (!merge.useCompoundFile) + return false; + } + return true; + } + + /** + * Create a mergeSpec based on the RAM dir usage, or the RAM dir usage given a potential + * merge. + * @param segmentInfos + * @param writer + * @param type + * @param maxSegmentCount + * @return + * @throws CorruptIndexException + * @throws IOException + */ + protected MergeSpecification findMerges(SegmentInfos segmentInfos, IndexWriter writer, int type, int maxSegmentCount) + throws CorruptIndexException, IOException { + SegmentInfos infos = writer.getSegmentInfos(); + SegmentInfos primInfos = writer.getSegmentInfos(writer.directory); + SegmentInfos ramInfos = writer.getSegmentInfos(writer.fileSwitchDirectory); + + if (writer.ramMergePolicy instanceof LogMergePolicy) { + assert !((LogMergePolicy)writer.ramMergePolicy).useCompoundDocStore; + assert !((LogMergePolicy)writer.ramMergePolicy).useCompoundFile; + } + MergeSpecification ramSpec = null; + //message("ramInfos.size:"+ramInfos.size()+" primInfos.size:"+primInfos.size()); + if (ramInfos.size() > 0) { + boolean isRAMDirOver = writer.ramPolicy.getRAMDirSize() >= writer.ramPolicy.getRAMDirLimit(); + // if we're not optimizing we're trying to + // perform merges and writing them to the ram dir + // if the ram segments don't look like they will fit + // we instead merge the ram segments to primaryDir + boolean mergeRAMToPrimary = false; + if (isRAMDirOver) { + mergeRAMToPrimary = true; + } else { + if (type == FIND_MERGES) + ramSpec = writer.ramMergePolicy.findMerges(ramInfos, writer); + else if (type == FIND_MERGES_EXPUNGE_DELETES) + ramSpec = writer.ramMergePolicy.findMergesToExpungeDeletes(segmentInfos, writer); + // if the ramSpec send us over the ramDir limit, merge everything to ram + if (ramSpec != null) { + long ramSpecSize = 0; + for (int x = 0; x < ramSpec.merges.size(); x++) { + MergePolicy.OneMerge m = (MergePolicy.OneMerge) ramSpec.merges.get(x); + ramSpecSize += getSize(m); + } + // if the new ram segments and ramDirSize will go over the limit, then + // merge everything to primary + if ((ramSpecSize + writer.ramPolicy.getRAMDirSize()) >= writer.ramPolicy.getRAMDirLimit()) { + mergeRAMToPrimary = true; + } + } + } + message("mergeRAMToPrimary:"+mergeRAMToPrimary); + if (mergeRAMToPrimary) { + MergePolicy.OneMerge allRamInfosMerge = new MergePolicy.OneMerge(ramInfos, ramInfos, writer.getUseCompoundFile(), writer.directory, writer); + writer.ensureContiguousMerge(allRamInfosMerge); + ramSpec = new MergePolicy.MergeSpecification(writer); + ramSpec.merges.add(allRamInfosMerge); + } else if (ramSpec != null) { + // otherwise create a regular merge spec + assert !mergesCompound(ramSpec); + } + } + MergeSpecification spec = null; + if (type == FIND_MERGES) { + spec = writer.mergePolicy.findMerges(primInfos, writer); + } else if (type == FIND_MERGES_EXPUNGE_DELETES) { + spec = writer.mergePolicy.findMergesToExpungeDeletes(primInfos, writer); + } else if (type == FIND_MERGES_OPTIMIZE) { + spec = writer.mergePolicy.findMergesForOptimize(segmentInfos, writer, maxSegmentCount, writer.segmentsToOptimize); + } + try { + if (ramSpec != null) ensureContiguous(ramSpec); + } catch (MergePolicy.MergeException ex) { + String typeStr = "findMerges"; + if (type == FIND_MERGES_EXPUNGE_DELETES) { + typeStr = "findMergesToExpungeDeletes"; + } + IOException ioe = new IOException("ramSpec not contiguous "+typeStr); + //throw new MergePolicy.MergeException(""); + ioe.initCause(ex); + throw ioe; + } + if (spec != null) ensureContiguous(spec); + MergeSpecification ms = mergeSpecs(ramSpec, spec, writer); + assert infos.size() == writer.getSegmentInfos().size(); + return ms; + } + + private void ensureContiguous(MergeSpecification spec) { + for (int x=0; x < spec.merges.size(); x++) { + MergePolicy.OneMerge merge = (MergePolicy.OneMerge)spec.merges.get(x); + writer.ensureContiguousMerge(merge); + } + } + + public MergeSpecification findMergesToExpungeDeletes(SegmentInfos segmentInfos, IndexWriter writer) throws CorruptIndexException, + IOException { + return findMerges(segmentInfos, writer, FIND_MERGES_EXPUNGE_DELETES, -1); + } + + protected boolean verbose() { + return writer != null && writer.verbose(); + } + + protected void message(String message) { + if (verbose()) { + writer.message("NRTMP: " + message); + } + } + + private static long getSize(MergePolicy.OneMerge merge) throws IOException { + long total = 0; + for (int x = 0; x < merge.segments.size(); x++) { + SegmentInfo si = (SegmentInfo) merge.segments.get(x); + total += si.sizeInBytes(); + } + return total; + } + /** + private static void setDir(MergeSpecification spec, Directory dir) { + if (spec != null) { + for (int x = 0; x < spec.merges.size(); x++) { + MergePolicy.OneMerge merge = (MergePolicy.OneMerge) spec.merges.get(x); + //merge.directory = dir; + } + } + } + **/ + private static MergeSpecification mergeSpecs(MergeSpecification ms1, MergeSpecification ms2, IndexWriter writer) { + if (ms1 == null && ms2 == null) + return null; + if (ms1 != null && ms2 != null) + assert ms1.writer == ms2.writer; + MergeSpecification ms = new MergeSpecification(writer); + if (ms1 != null) + ms.merges.addAll(ms1.merges); + if (ms2 != null) + ms.merges.addAll(ms2.merges); + return ms; + } +} Index: src/java/org/apache/lucene/index/RAMPolicy.java =================================================================== --- src/java/org/apache/lucene/index/RAMPolicy.java (revision 0) +++ src/java/org/apache/lucene/index/RAMPolicy.java (revision 0) @@ -0,0 +1,149 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.text.NumberFormat; + +/** + * Dynamically calculates RAM limits. + */ +public class RAMPolicy { + public static boolean TRACE = false; + public static final double DEFAULT_PERCENT_OVER_LIMIT = 1.40; + public static final double DEFAULT_RAMDIR_PERCENT_LIMIT = 0.90; + long ramDirSize = 0; // current real size of the ram dir + double percentTemp = DEFAULT_PERCENT_OVER_LIMIT; + double ramDirPercentLimit = DEFAULT_RAMDIR_PERCENT_LIMIT; + long totalMax; + long tempMax; + NumberFormat nf = NumberFormat.getInstance(); + IndexWriter writer; + + public RAMPolicy(IndexWriter writer) throws IOException { + this.writer = writer; + // we default the total max to the ramBufferSize + setTotalMax((long)(writer.getRAMBufferSizeMB()*1024*1024)); + } + + public long getRAMSize() { + // TODO: may need to synchronize on docWriter? + return writer.docWriter.numBytesUsed + getRAMDirSize(); + } + + public String getStatusMsg() throws IOException { + return "primDirSize:"+writer.size(writer.directory)+" numBytesUsed:"+toMB(writer.docWriter.numBytesUsed) + +" alloc:"+toMB(writer.docWriter.numBytesAlloc)//+" ramBufLimit:"+toMB(getRAMBufferLimit(1.0)) + +" ramDir:"+toMB(getRAMDirSize())+" ramUsed:"+toMB(getRAMSize())+" max:"+toMB(totalMax); + } + + /** + * Set the temporary percent of the total ram allowed + * @param totalMaxOver + */ + public void setPercentTemp(double percentTemp) { + synchronized (writer.docWriter) { + this.percentTemp = percentTemp; + tempMax = (long)(percentTemp * (double)totalMax); + } + } + + /** + * + * @param percent + * @return + */ + public long getRAMBufferLimit(double percent) { + long space = tempMax - ramDirSize; + return (long)((space/2)*percent); + } + + /** + * When ram dir is greater than or equals to this number, + * schedule all it's segments be merged to the primaryDir. + * @return + */ + public long getRAMDirLimit() { + // ramDirPercentLimit should make the limit lower than the actual so that + // we start writing to the primaryDir earlier + return (long)((double)(totalMax - writer.docWriter.numBytesUsed)*ramDirPercentLimit); + } + + String toMB(long v) { + return nf.format(v/1024./1024.); + } + + void printRAMDirUsage(String msg) throws IOException { + String prefix = ""; + if (msg != null && !msg.equals("")) { + prefix = msg + " "; + } + message(prefix+getStatusMsg()); + } + + protected boolean verbose() { + return writer != null && writer.verbose(); + } + + public long getRAMDirSize() { + return ramDirSize; + } + + /** + * @param total + */ + public void setTotalMax(long totalMax) throws IOException { + //synchronized (writer.docWriter) { + this.totalMax = totalMax; + if (TRACE) message("totalMax:"+toMB(totalMax)); + setPercentTemp(percentTemp); // given the new totalMax, set the new temp limit + writer.docWriter.setMaxRAM(totalMax); + //} + } + + protected void message(String message) { + if (verbose()) { + writer.message("RAMP: " + message); + } + } + + public final void pushRAMDirSize(long ramDirSize) { + this.ramDirSize = ramDirSize; + if (TRACE) message("pushRamDirSize:"+ramDirSize);//+" ramDirAllocSize:"+ramDirAllocSize); + } + + /** + * The allocated size is returned which most of the time is the + * same as the actual size. + * @return + * @throws IOException + */ + public final long getRAMBufferSize() throws IOException { + return writer.docWriter.numBytesUsed; + } + + /** + * The available is totalMax - (ramDirAllocSize + ramBufferSize) + * + * @return + */ + public final long getTotalAvailable() { + long avail = totalMax - (ramDirSize + writer.docWriter.numBytesUsed); + return avail; + } +} Index: src/java/org/apache/lucene/index/SegmentInfo.java =================================================================== --- src/java/org/apache/lucene/index/SegmentInfo.java (revision 789908) +++ src/java/org/apache/lucene/index/SegmentInfo.java (working copy) @@ -675,7 +675,7 @@ } /** Used for debugging */ - public String segString(Directory dir) { + public String segString(Directory dir, IndexWriter writer) { String cfs; try { if (getUseCompoundFile()) @@ -694,8 +694,8 @@ docStore = ""; return name + ":" + - cfs + - (this.dir == dir ? "" : "x") + + cfs +"."+ + writer.printDir(dir)+ docCount + docStore; } Index: src/java/org/apache/lucene/index/SegmentInfos.java =================================================================== --- src/java/org/apache/lucene/index/SegmentInfos.java (revision 789908) +++ src/java/org/apache/lucene/index/SegmentInfos.java (working copy) @@ -815,7 +815,7 @@ final int size = size(); for(int i=0;i= 10000.0) { - merge.checkAborted(dir); + merge.checkAborted(merge.directory); workCount = 0; } } Index: src/java/org/apache/lucene/index/SegmentReader.java =================================================================== --- src/java/org/apache/lucene/index/SegmentReader.java (revision 789908) +++ src/java/org/apache/lucene/index/SegmentReader.java (working copy) @@ -491,6 +491,8 @@ final Directory storeDir; if (si.getDocStoreOffset() != -1) { if (si.getDocStoreIsCompoundFile()) { + assert storeCFSReader == null; + storeCFSReader = new CompoundFileReader(directory(), si.getDocStoreSegment() + "." + IndexFileNames.COMPOUND_FILE_STORE_EXTENSION, readBufferSize); Index: src/java/org/apache/lucene/index/SerialMergeScheduler.java =================================================================== --- src/java/org/apache/lucene/index/SerialMergeScheduler.java (revision 789908) +++ src/java/org/apache/lucene/index/SerialMergeScheduler.java (working copy) @@ -19,10 +19,11 @@ import java.io.IOException; +import org.apache.lucene.store.Directory; + /** A {@link MergeScheduler} that simply does each merge * sequentially, using the current thread. */ -public class SerialMergeScheduler extends MergeScheduler { - +public class SerialMergeScheduler extends MergeScheduler { /** Just do the merges in sequence. We do this * "synchronized" so that even if the application is using * multiple threads, only one merge may run at a time. */ Index: src/java/org/apache/lucene/index/StoredFieldsWriter.java =================================================================== --- src/java/org/apache/lucene/index/StoredFieldsWriter.java (revision 789908) +++ src/java/org/apache/lucene/index/StoredFieldsWriter.java (working copy) @@ -20,6 +20,7 @@ import java.util.Map; import java.io.IOException; import org.apache.lucene.store.RAMOutputStream; +import org.apache.lucene.store.Directory; import org.apache.lucene.util.ArrayUtil; /** This is a DocFieldConsumer that writes stored fields. */ @@ -46,7 +47,7 @@ // It's possible that all documents seen in this segment // hit non-aborting exceptions, in which case we will // not have yet init'd the FieldsWriter: - initFieldsWriter(); + initFieldsWriter(state.directory); // Fill fdx file to include any final docs that we // skipped because they hit non-aborting exceptions @@ -57,12 +58,12 @@ fieldsWriter.flush(); } - private void initFieldsWriter() throws IOException { + private void initFieldsWriter(Directory directory) throws IOException { if (fieldsWriter == null) { final String docStoreSegment = docWriter.getDocStoreSegment(); if (docStoreSegment != null) { assert docStoreSegment != null; - fieldsWriter = new FieldsWriter(docWriter.directory, + fieldsWriter = new FieldsWriter(directory, docStoreSegment, fieldInfos); docWriter.addOpenFile(docStoreSegment + "." + IndexFileNames.FIELDS_EXTENSION); @@ -75,7 +76,7 @@ synchronized public void closeDocStore(SegmentWriteState state) throws IOException { final int inc = state.numDocsInStore - lastDocID; if (inc > 0) { - initFieldsWriter(); + initFieldsWriter(state.directory); fill(state.numDocsInStore - docWriter.getDocStoreOffset()); } @@ -138,7 +139,11 @@ synchronized void finishDocument(PerDoc perDoc) throws IOException { assert docWriter.writer.testPoint("StoredFieldsWriter.finishDocument start"); - initFieldsWriter(); + // TODO: we need a way to pass the directory + // in here from segmentWriteState, though we can + // safely assume (because docstores always go to + // the primaryDir that this is ok) + initFieldsWriter(docWriter.writer.directory); fill(perDoc.docID); Index: src/java/org/apache/lucene/index/TermVectorsTermsWriter.java =================================================================== --- src/java/org/apache/lucene/index/TermVectorsTermsWriter.java (revision 789908) +++ src/java/org/apache/lucene/index/TermVectorsTermsWriter.java (working copy) @@ -19,6 +19,7 @@ import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.RAMOutputStream; +import org.apache.lucene.store.Directory; import org.apache.lucene.util.ArrayUtil; import java.io.IOException; @@ -36,9 +37,14 @@ IndexOutput tvd; IndexOutput tvf; int lastDocID; + Directory dir; public TermVectorsTermsWriter(DocumentsWriter docWriter) { this.docWriter = docWriter; + // term vectors always go to the primary directory + // TODO: we should be getting the directory though from + // SegmentWriteState + this.dir = docWriter.writer.directory; } public TermsHashConsumerPerThread addThread(TermsHashPerThread termsHashPerThread) { @@ -152,9 +158,9 @@ // vector output files, we must abort this segment // because those files will be in an unknown // state: - tvx = docWriter.directory.createOutput(docStoreSegment + "." + IndexFileNames.VECTORS_INDEX_EXTENSION); - tvd = docWriter.directory.createOutput(docStoreSegment + "." + IndexFileNames.VECTORS_DOCUMENTS_EXTENSION); - tvf = docWriter.directory.createOutput(docStoreSegment + "." + IndexFileNames.VECTORS_FIELDS_EXTENSION); + tvx = dir.createOutput(docStoreSegment + "." + IndexFileNames.VECTORS_INDEX_EXTENSION); + tvd = dir.createOutput(docStoreSegment + "." + IndexFileNames.VECTORS_DOCUMENTS_EXTENSION); + tvf = dir.createOutput(docStoreSegment + "." + IndexFileNames.VECTORS_FIELDS_EXTENSION); tvx.writeInt(TermVectorsReader.FORMAT_CURRENT); tvd.writeInt(TermVectorsReader.FORMAT_CURRENT); Index: src/java/org/apache/lucene/store/Directory.java =================================================================== --- src/java/org/apache/lucene/store/Directory.java (revision 789908) +++ src/java/org/apache/lucene/store/Directory.java (working copy) @@ -67,7 +67,17 @@ { return list(); } - + + /** + * Returns true if the given directory is equal to or + * is contained within this directory. + * @param other Directory to compare against + * @return if this directory equals or contains the given directory + */ + public boolean contains(Directory other) { + return equals(other); + } + /** Returns true iff a file with the given name exists. */ public abstract boolean fileExists(String name) throws IOException; Index: src/java/org/apache/lucene/store/FileSwitchDirectory.java =================================================================== --- src/java/org/apache/lucene/store/FileSwitchDirectory.java (revision 789908) +++ src/java/org/apache/lucene/store/FileSwitchDirectory.java (working copy) @@ -29,13 +29,19 @@ * to this class, and must allow multiple threads to call * contains at once. */ - public class FileSwitchDirectory extends Directory { private final Directory secondaryDir; private final Directory primaryDir; private final Set primaryExtensions; private boolean doClose; + /** + * + * @param primaryExtensions If files match these extensions they go to the primaryDir + * @param primaryDir + * @param secondaryDir + * @param doClose + */ public FileSwitchDirectory(Set primaryExtensions, Directory primaryDir, Directory secondaryDir, boolean doClose) { this.primaryExtensions = primaryExtensions; this.primaryDir = primaryDir; @@ -44,6 +50,22 @@ this.lockFactory = primaryDir.getLockFactory(); } + /** + * Returns true of the given directory equals the primaryDir + * or the secondaryDir. If the given dir is a FileSwitchDirectory + * return true if it contains our primaryDir or secondaryDir. + */ + public boolean contains(Directory other) { + if (other instanceof FileSwitchDirectory) { + FileSwitchDirectory otherFsd = (FileSwitchDirectory)other; + if (otherFsd.contains(primaryDir) || otherFsd.contains(secondaryDir)) { + return true; + } + return false; + } + return primaryDir.equals(other) || secondaryDir.equals(other); + } + public Directory getPrimaryDir() { return primaryDir; } @@ -84,7 +106,9 @@ return name.substring(i+1, name.length()); } - private Directory getDirectory(String name) { + public Directory getDirectory(String name) throws IOException { + if (primaryDir.fileExists(name)) return primaryDir; + if (secondaryDir.fileExists(name)) return secondaryDir; String ext = getExtension(name); if (primaryExtensions.contains(ext)) { return primaryDir; Index: src/test/org/apache/lucene/index/DocHelper.java =================================================================== --- src/test/org/apache/lucene/index/DocHelper.java (revision 789908) +++ src/test/org/apache/lucene/index/DocHelper.java (working copy) @@ -247,7 +247,7 @@ writer.setSimilarity(similarity); //writer.setUseCompoundFile(false); writer.addDocument(doc); - writer.flush(); + writer.commit(); SegmentInfo info = writer.newestSegment(); writer.close(); return info; Index: src/test/org/apache/lucene/index/TestAddIndexesNoOptimize.java =================================================================== --- src/test/org/apache/lucene/index/TestAddIndexesNoOptimize.java (revision 789908) +++ src/test/org/apache/lucene/index/TestAddIndexesNoOptimize.java (working copy) @@ -295,7 +295,11 @@ writer.addIndexesNoOptimize(new Directory[] { aux }); assertEquals(1040, writer.docCount()); - assertEquals(2, writer.getSegmentCount()); + if (writer.isFlushToRAM()) { + assertEquals(5, writer.getSegmentCount()); + } else { + assertEquals(2, writer.getSegmentCount()); + } assertEquals(1000, writer.getDocCount(0)); writer.close(); @@ -319,7 +323,11 @@ writer.addIndexesNoOptimize(new Directory[] { aux }); assertEquals(1032, writer.docCount()); - assertEquals(2, writer.getSegmentCount()); + if (writer.isFlushToRAM()) { + assertEquals(5, writer.getSegmentCount()); + } else { + assertEquals(2, writer.getSegmentCount()); + } assertEquals(1000, writer.getDocCount(0)); writer.close(); Index: src/test/org/apache/lucene/index/TestAtomicUpdate.java =================================================================== --- src/test/org/apache/lucene/index/TestAtomicUpdate.java (revision 789908) +++ src/test/org/apache/lucene/index/TestAtomicUpdate.java (working copy) @@ -138,7 +138,8 @@ } writer.commit(); - IndexReader r = IndexReader.open(directory); + Directory fsd = writer.getFileSwitchDirectory(); + IndexReader r = IndexReader.open(fsd); assertEquals(100, r.numDocs()); r.close(); @@ -150,11 +151,11 @@ threads[1] = indexerThread2; indexerThread2.start(); - SearcherThread searcherThread1 = new SearcherThread(directory, threads); + SearcherThread searcherThread1 = new SearcherThread(fsd, threads); threads[2] = searcherThread1; searcherThread1.start(); - SearcherThread searcherThread2 = new SearcherThread(directory, threads); + SearcherThread searcherThread2 = new SearcherThread(fsd, threads); threads[3] = searcherThread2; searcherThread2.start(); Index: src/test/org/apache/lucene/index/TestBackwardsCompatibility.java =================================================================== --- src/test/org/apache/lucene/index/TestBackwardsCompatibility.java (revision 789908) +++ src/test/org/apache/lucene/index/TestBackwardsCompatibility.java (working copy) @@ -435,8 +435,13 @@ // figure out which field number corresponds to // "content", and then set our expected file names below // accordingly: - CompoundFileReader cfsReader = new CompoundFileReader(dir, "_0.cfs"); - FieldInfos fieldInfos = new FieldInfos(cfsReader, "_0.fnm"); + String segPrefix = "_0"; + if (writer.isFlushToRAM()) { + segPrefix = "_1"; + } + + CompoundFileReader cfsReader = new CompoundFileReader(dir, segPrefix+".cfs"); + FieldInfos fieldInfos = new FieldInfos(cfsReader, segPrefix+".fnm"); int contentFieldIndex = -1; for(int i=0;i= 157); + IndexReader reader = IndexReader.open(dir); + assertTrue(reader.numDocs() >= 157); + } } public void testCrashAfterClose() throws IOException { Index: src/test/org/apache/lucene/index/TestDeletionPolicy.java =================================================================== --- src/test/org/apache/lucene/index/TestDeletionPolicy.java (revision 789908) +++ src/test/org/apache/lucene/index/TestDeletionPolicy.java (working copy) @@ -279,6 +279,10 @@ policy.dir = dir; IndexWriter writer = new IndexWriter(dir, autoCommit, new WhitespaceAnalyzer(), true, policy); + if (autoCommit && writer.isFlushToRAM()) { + // for now we skip this as autoCommit will be deprecated + continue; + } writer.setMaxBufferedDocs(10); writer.setUseCompoundFile(useCompoundFile); writer.setMergeScheduler(new SerialMergeScheduler()); @@ -295,20 +299,34 @@ writer.close(); assertEquals(2, policy.numOnInit); - if (!autoCommit) + if (!autoCommit) { // If we are not auto committing then there should // be exactly 2 commits (one per close above): - assertEquals(2, policy.numOnCommit); + if (writer.isFlushToRAM()) { + // it's 1 because after the first close, there's nothing + // more to optimize (because all the ram segments were + // merged to 1 segment on the primaryDir + // and so there's no new commit point + assertEquals(1, policy.numOnCommit); + } else { + assertEquals(2, policy.numOnCommit); + } + } // Test listCommits Collection commits = IndexReader.listCommits(dir); if (!autoCommit) // 1 from opening writer + 2 from closing writer - assertEquals(3, commits.size()); - else + if (writer.isFlushToRAM()) { + assertEquals(2, commits.size()); + } else { + assertEquals(3, commits.size()); + } + else { // 1 from opening writer + 2 from closing writer + // 11 from calling writer.commit() explicitly above assertEquals(14, commits.size()); + } Iterator it = commits.iterator(); // Make sure we can open a reader on each commit: Index: src/test/org/apache/lucene/index/TestDoc.java =================================================================== --- src/test/org/apache/lucene/index/TestDoc.java (revision 789908) +++ src/test/org/apache/lucene/index/TestDoc.java (working copy) @@ -165,7 +165,7 @@ File file = new File(workDir, fileName); Document doc = FileDocument.Document(file); writer.addDocument(doc); - writer.flush(); + writer.commit(); return writer.newestSegment(); } Index: src/test/org/apache/lucene/index/TestDocumentWriter.java =================================================================== --- src/test/org/apache/lucene/index/TestDocumentWriter.java (revision 789908) +++ src/test/org/apache/lucene/index/TestDocumentWriter.java (working copy) @@ -63,7 +63,7 @@ Analyzer analyzer = new WhitespaceAnalyzer(); IndexWriter writer = new IndexWriter(dir, analyzer, true, IndexWriter.MaxFieldLength.LIMITED); writer.addDocument(testDoc); - writer.flush(); + writer.commit(); SegmentInfo info = writer.newestSegment(); writer.close(); //After adding the document, we should be able to read it back in @@ -123,7 +123,7 @@ doc.add(new Field("repeated", "repeated two", Field.Store.YES, Field.Index.ANALYZED)); writer.addDocument(doc); - writer.flush(); + writer.commit(); SegmentInfo info = writer.newestSegment(); writer.close(); SegmentReader reader = SegmentReader.get(info); @@ -183,7 +183,7 @@ doc.add(new Field("f1", "a 5 a a", Field.Store.YES, Field.Index.ANALYZED)); writer.addDocument(doc); - writer.flush(); + writer.commit(); SegmentInfo info = writer.newestSegment(); writer.close(); SegmentReader reader = SegmentReader.get(info); @@ -223,7 +223,7 @@ }, TermVector.NO)); writer.addDocument(doc); - writer.flush(); + writer.commit(); SegmentInfo info = writer.newestSegment(); writer.close(); SegmentReader reader = SegmentReader.get(info); Index: src/test/org/apache/lucene/index/TestFieldsReader.java =================================================================== --- src/test/org/apache/lucene/index/TestFieldsReader.java (revision 789908) +++ src/test/org/apache/lucene/index/TestFieldsReader.java (working copy) @@ -34,11 +34,13 @@ import java.util.*; public class TestFieldsReader extends LuceneTestCase { - private RAMDirectory dir = new RAMDirectory(); + private Directory dir; private Document testDoc = new Document(); private FieldInfos fieldInfos = null; private final static String TEST_SEGMENT_NAME = "_0"; + private final static String TEST_SEGMENT_NAME_FSD = "_1"; + private String testSegmentName = TEST_SEGMENT_NAME; public TestFieldsReader(String s) { super(s); @@ -49,7 +51,14 @@ fieldInfos = new FieldInfos(); DocHelper.setupDoc(testDoc); fieldInfos.add(testDoc); - IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true, IndexWriter.MaxFieldLength.LIMITED); + Directory primaryDir = new RAMDirectory(); + IndexWriter writer = new IndexWriter(primaryDir, new WhitespaceAnalyzer(), true, IndexWriter.MaxFieldLength.LIMITED); + if (writer.isFlushToRAM()) { + testSegmentName = TEST_SEGMENT_NAME_FSD; + dir = writer.getFileSwitchDirectory(); + } else { + dir = primaryDir; + } writer.setUseCompoundFile(false); writer.addDocument(testDoc); writer.close(); @@ -58,7 +67,7 @@ public void test() throws IOException { assertTrue(dir != null); assertTrue(fieldInfos != null); - FieldsReader reader = new FieldsReader(dir, TEST_SEGMENT_NAME, fieldInfos); + FieldsReader reader = new FieldsReader(dir, testSegmentName, fieldInfos); assertTrue(reader != null); assertTrue(reader.size() == 1); Document doc = reader.doc(0, null); @@ -96,7 +105,7 @@ public void testLazyFields() throws Exception { assertTrue(dir != null); assertTrue(fieldInfos != null); - FieldsReader reader = new FieldsReader(dir, TEST_SEGMENT_NAME, fieldInfos); + FieldsReader reader = new FieldsReader(dir, testSegmentName, fieldInfos); assertTrue(reader != null); assertTrue(reader.size() == 1); Set loadFieldNames = new HashSet(); @@ -154,7 +163,7 @@ public void testLazyFieldsAfterClose() throws Exception { assertTrue(dir != null); assertTrue(fieldInfos != null); - FieldsReader reader = new FieldsReader(dir, TEST_SEGMENT_NAME, fieldInfos); + FieldsReader reader = new FieldsReader(dir, testSegmentName, fieldInfos); assertTrue(reader != null); assertTrue(reader.size() == 1); Set loadFieldNames = new HashSet(); @@ -184,7 +193,7 @@ public void testLoadFirst() throws Exception { assertTrue(dir != null); assertTrue(fieldInfos != null); - FieldsReader reader = new FieldsReader(dir, TEST_SEGMENT_NAME, fieldInfos); + FieldsReader reader = new FieldsReader(dir, testSegmentName, fieldInfos); assertTrue(reader != null); assertTrue(reader.size() == 1); LoadFirstFieldSelector fieldSelector = new LoadFirstFieldSelector(); @@ -217,8 +226,13 @@ _TestUtil.rmDir(file); FSDirectory tmpDir = FSDirectory.open(file); assertTrue(tmpDir != null); - + + Directory testDir = tmpDir; + IndexWriter writer = new IndexWriter(tmpDir, new WhitespaceAnalyzer(), true, IndexWriter.MaxFieldLength.LIMITED); + if (writer.isFlushToRAM()) { + testDir = writer.getFileSwitchDirectory(); + } writer.setUseCompoundFile(false); writer.addDocument(testDoc); writer.close(); @@ -233,7 +247,7 @@ SetBasedFieldSelector fieldSelector = new SetBasedFieldSelector(Collections.EMPTY_SET, lazyFieldNames); for (int i = 0; i < length; i++) { - reader = new FieldsReader(tmpDir, TEST_SEGMENT_NAME, fieldInfos); + reader = new FieldsReader(tmpDir, testSegmentName, fieldInfos); assertTrue(reader != null); assertTrue(reader.size() == 1); @@ -257,7 +271,7 @@ doc = null; //Hmmm, are we still in cache??? System.gc(); - reader = new FieldsReader(tmpDir, TEST_SEGMENT_NAME, fieldInfos); + reader = new FieldsReader(testDir, testSegmentName, fieldInfos); doc = reader.doc(0, fieldSelector); field = doc.getFieldable(DocHelper.LARGE_LAZY_FIELD_KEY); assertTrue("field is not lazy", field.isLazy() == true); @@ -275,7 +289,7 @@ } public void testLoadSize() throws IOException { - FieldsReader reader = new FieldsReader(dir, TEST_SEGMENT_NAME, fieldInfos); + FieldsReader reader = new FieldsReader(dir, testSegmentName, fieldInfos); Document doc; doc = reader.doc(0, new FieldSelector(){ Index: src/test/org/apache/lucene/index/TestIndexFileDeleter.java =================================================================== --- src/test/org/apache/lucene/index/TestIndexFileDeleter.java (revision 789908) +++ src/test/org/apache/lucene/index/TestIndexFileDeleter.java (working copy) @@ -47,6 +47,7 @@ for(i=0;i<35;i++) { addDoc(writer, i); } + writer.commit(); writer.setUseCompoundFile(false); for(;i<45;i++) { addDoc(writer, i); @@ -67,11 +68,11 @@ // .s0 file: String[] files = dir.listAll(); - /* - for(int j=0;j lastNumFile); @@ -1284,6 +1291,10 @@ public void testChangingRAMBuffer() throws IOException { RAMDirectory dir = new RAMDirectory(); IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true, IndexWriter.MaxFieldLength.LIMITED); + if (writer.isFlushToRAM()) { + // don't run the test if we're in flushToRAM mode + return; + } writer.setMaxBufferedDocs(10); writer.setRAMBufferSizeMB(IndexWriter.DISABLE_AUTO_FLUSH); @@ -1338,6 +1349,7 @@ public void testChangingRAMBuffer2() throws IOException { RAMDirectory dir = new RAMDirectory(); IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true, IndexWriter.MaxFieldLength.LIMITED); + if (writer.isFlushToRAM()) return; writer.setMaxBufferedDocs(10); writer.setMaxBufferedDeleteTerms(10); writer.setRAMBufferSizeMB(IndexWriter.DISABLE_AUTO_FLUSH); @@ -1568,9 +1580,13 @@ writer.close(); SegmentInfos sis = new SegmentInfos(); sis.read(dir); - // Since we flushed w/o allowing merging we should now - // have 10 segments - assert sis.size() == 10; + if (writer.isFlushToRAM()) { + assert sis.size() == 1; + } else { + // Since we flushed w/o allowing merging we should now + // have 10 segments + assert sis.size() == 10; + } } // Make sure we can flush segment w/ norms, then add @@ -1619,12 +1635,20 @@ writer.close(); IndexReader reader = IndexReader.open(dir); - assertTrue(!reader.isOptimized()); + if (writer.isFlushToRAM()) { + assertTrue(reader.isOptimized()); + } else { + assertTrue(!reader.isOptimized()); + } reader.close(); SegmentInfos infos = new SegmentInfos(); infos.read(dir); - assertEquals(2, infos.size()); + if (writer.isFlushToRAM()) { + assertEquals(1, infos.size()); + } else { + assertEquals(2, infos.size()); + } } } @@ -1758,6 +1782,7 @@ // LUCENE-1013 public void testSetMaxMergeDocs() throws IOException { + if (IndexWriter.GLOBALNRT) return; MockRAMDirectory dir = new MockRAMDirectory(); IndexWriter iw = new IndexWriter(dir, new StandardAnalyzer(), true, IndexWriter.MaxFieldLength.LIMITED); iw.setMergeScheduler(new MyMergeScheduler()); @@ -1817,7 +1842,12 @@ writer.close(); IndexReader reader = IndexReader.open(dir); final Term t = new Term("content", "aa"); - assertEquals(reader.docFreq(t), 3); + if (writer.isFlushToRAM()) { + // not sure why this is different when we're flusing to RAM? + assertEquals(reader.docFreq(t), 2); + } else { + assertEquals(reader.docFreq(t), 3); + } // Make sure the doc that hit the exception was marked // as deleted: @@ -1873,6 +1903,7 @@ dir.failOn(failure); IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED); + if (writer.isFlushToRAM()) return; writer.setMaxBufferedDocs(2); Document doc = new Document(); String contents = "aa bb cc dd ee ff gg hh ii jj kk"; @@ -1917,6 +1948,7 @@ } public void testDocumentsWriterExceptions() throws IOException { + if (IndexWriter.GLOBALNRT) return; Analyzer analyzer = new Analyzer() { public TokenStream tokenStream(String fieldName, Reader reader) { return new CrashingFilter(fieldName, new WhitespaceTokenizer(reader)); @@ -1999,6 +2031,7 @@ } public void testDocumentsWriterExceptionThreads() throws Exception { + if (IndexWriter.GLOBALNRT) return; Analyzer analyzer = new Analyzer() { public TokenStream tokenStream(String fieldName, Reader reader) { return new CrashingFilter(fieldName, new WhitespaceTokenizer(reader)); @@ -3192,8 +3225,12 @@ public void testExceptionOnMergeInit() throws IOException { MockRAMDirectory dir = new MockRAMDirectory(); MockIndexWriter2 w = new MockIndexWriter2(dir, new WhitespaceAnalyzer(), true, IndexWriter.MaxFieldLength.UNLIMITED); + //w.setInfoStream(System.out); w.setMaxBufferedDocs(2); w.setMergeFactor(2); + if (w.isFlushToRAM()) { + ((LogMergePolicy)w.getRAMMergePolicy()).setMergeFactor(2); + } w.doFail = true; w.setMergeScheduler(new ConcurrentMergeScheduler()); Document doc = new Document(); @@ -3205,10 +3242,22 @@ } catch (RuntimeException re) { break; } - + if (w.isFlushToRAM()) { + try { + w.commit(); + } catch (RuntimeException re) { + } + } ((ConcurrentMergeScheduler) w.getMergeScheduler()).sync(); assertTrue(w.failed); - w.close(); + if (w.isFlushToRAM()) { + try { + w.close(); + } catch (RuntimeException re) { + } + } else { + w.close(); + } dir.close(); } Index: src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java =================================================================== --- src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java (revision 789908) +++ src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java (working copy) @@ -238,7 +238,6 @@ if (upperBound * mergeFactor <= maxMergeDocs) { assertTrue(numSegments < mergeFactor); } - String[] files = writer.getDirectory().listAll(); int segmentCfsCount = 0; for (int i = 0; i < files.length; i++) { Index: src/test/org/apache/lucene/index/TestIndexWriterRAMDir.java =================================================================== --- src/test/org/apache/lucene/index/TestIndexWriterRAMDir.java (revision 0) +++ src/test/org/apache/lucene/index/TestIndexWriterRAMDir.java (revision 0) @@ -0,0 +1,937 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import org.apache.lucene.analysis.WhitespaceAnalyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.FileSwitchDirectory; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.MockRAMDirectory; +import org.apache.lucene.util.LuceneTestCase; + +/** + * Tests the IndexWriter ram directory where if a ram directory is set on the + * writer, flushes occur to it first, then are later merged to primaryDir. + */ + +// TODO: test reading from the docStore while it's still open +public class TestIndexWriterRAMDir extends LuceneTestCase { + public static final boolean TRACE = false; + + /** + * Have a deletion policy that doesn't delete after commits. + * @throws Exception + */ + public void testDeletionPolicy() throws Exception { + KeepAllDeletionPolicy deletionPolicy = new KeepAllDeletionPolicy(); + RAMPolicy.TRACE = true; + //Directory dir = createFSDirectory();// new MockRAMDirectory(); + Directory dir = new MockRAMDirectory(); + MockRAMDirectory ramDir = createRamDir(); + IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED, true, ramDir, deletionPolicy); + writer.setRAMBufferSizeMB(0.30); + for (int x = 0; x < 1000; x++) { + Document d = TestIndexWriterReader.createDocument(x, "primary", 75); + if (TRACE) System.out.println("IW.addDocument " + x); + writer.addDocument(d); + } + writer.commit(); + writer.close(); + assertEquals(0, writer.getFSDSegmentInfos().size()); + assertEquals(0, ramDir.sizeInBytes()); + } + + class KeepAllDeletionPolicy implements IndexDeletionPolicy { + int numOnInit; + int numOnCommit; + Directory dir; + + public void onInit(List commits) throws IOException { + //verifyCommitOrder(commits); + numOnInit++; + } + public void onCommit(List commits) throws IOException { + IndexCommit lastCommit = (IndexCommit) commits.get(commits.size()-1); + //IndexReader r = IndexReader.open(dir); + //assertEquals("lastCommit.isOptimized()=" + lastCommit.isOptimized() + " vs IndexReader.isOptimized=" + r.isOptimized(), r.isOptimized(), lastCommit.isOptimized()); + //r.close(); + //verifyCommitOrder(commits); + numOnCommit++; + } + } + + /** + * NRT writes out segment docstores to one file, this requires + * opening IndexInput while IndexOutput is also open. + * This test method insures the described process works properly + * with FSDirectory. + * @throws Exception + */ + public void testDocStoreStreams() throws Exception { + FSDirectory dir = createFSDirectory(); + String fileName = "testfile.bin"; + IndexOutput out = dir.createOutput(fileName); + + // start writing to the output + // periodically open a input thread and read from + // where the file begins and what would be the segment + // endpoint + int c = 0; + int expLen = 0; + List threads = new ArrayList(); + int lastStart = 0; + Semaphore sem = new Semaphore(19); + while (true) { + out.writeInt(c++); + expLen += 4; + if (c % 5000 == 0 && c < (5000 * 20)) { + out.flush(); + long fp = out.getFilePointer(); + // should be >= expLen + assertTrue(out.length() >= expLen); + IndexInput in = dir.openInput(fileName); + InputTest it = new InputTest(threads.size(), sem, lastStart, out.length(), in); + lastStart = (int)out.length(); + Thread thread = new Thread(it); + threads.add(thread); + thread.start(); + } + if (sem.isDone()) break; + //if (c > 5000 * 20) break; + } + //for (int x=0; x < threads.size(); x++) { + // InputTest it = (InputTest)threads.get(x); + // it. + //} + //while (true) { + // Thread.sleep(50); + //} + //if (c % 100 == 0) { + // System.out.println("wrote "+c); + // Thread.sleep(500); + //} + } + + private static class Semaphore { + private int count; + + private Semaphore(int count) { + this.count = count; + } + + public synchronized boolean isDone() { + return count == 0; + } + + public synchronized void decrement() { + count--; + } + } + /** + * LUCENE-1313 keeps open and reads from IndexInput while also writing + * IndexOutput one the same file for docStores. + * + * @throws Exception + + public void testConcurrentFSStreams() throws Exception { + Directory dir = createFSDirectory(); + + String fileName = "testfile.bin"; + + IndexOutput output = dir.createOutput(fileName); + IndexInput input = dir.openInput(fileName); + InputTest inputTest = new InputTest(input); + OutputTest outputTest = new OutputTest(output); + new Thread(inputTest).start(); + new Thread(outputTest).start(); + Thread.sleep(1000*30); + outputTest.go = false; + Thread.sleep(1000*3); + inputTest.go = false; + if (inputTest.e != null) { + inputTest.e.printStackTrace(); + } + } +*/ + private class OutputTest implements Runnable { + boolean go = true; + IndexOutput output; + int c = 0; + Exception e; + + public OutputTest(IndexOutput output) { + this.output = output; + } + + public void run() { + try { + while (go) { + output.writeInt(c++); + output.flush(); + //output.length(); // should be >= expected len + if (c % 100 == 0) { + //System.out.println("wrote "+c); + Thread.sleep(500); + } + } + } catch (Exception e) { + this.e = e; + } + } + } + + private class InputTest implements Runnable { + boolean go = true; + IndexInput input; + Exception e; + long start, end; + boolean reachedEnd = false; + Semaphore semaphore; + int num; + + public InputTest(int num, Semaphore semaphore, long start, long end, IndexInput input) { + this.num = num; + this.semaphore = semaphore; + this.start = start; + this.end = end; + this.input = input; + } + + public void run() { + int c = 0; + int zeroc = 0; + //System.out.println(num+" InputTest start:"+start+" end:"+end); + try { + input.seek(start); + while (go) { + long left = end - input.getFilePointer(); + if (left > 0) { + int i = input.readInt(); + //if (c % 100 == 0) { + //System.out.println(start+" read "+i); + //} + c++; + } else { + reachedEnd = true; + semaphore.decrement(); + return; + /** + zeroc++; + if (zeroc > 20) { + throw new Exception("we're not seeing any data"); + } + Thread.sleep(500); + **/ + } + } + } catch (Exception e) { + this.e = e; + e.printStackTrace(System.out); + } + } + } + + /** + * Test to see if we can load documents after we've added them + * (which could fail if the FSDirectory doesn't allow simultaneous + * IndexInput and IndexOutput streams to be open). + */ + public void testReadDocuments() throws Exception { + Directory dir = createFSDirectory(); + MockRAMDirectory ramDir = createRamDir(); + IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED, true, ramDir); + writer.setRAMBufferSizeMB(2.0); + AddDocumentsThread[] addThreads = new AddDocumentsThread[3]; + for (int x=0; x < addThreads.length; x++) { + addThreads[x] = new AddDocumentsThread(x+"", writer); + addThreads[x].start(); + } + + ReadDocsThread[] readThreads = new ReadDocsThread[3]; + for (int x=0; x < readThreads.length; x++) { + readThreads[x] = new ReadDocsThread(x+"", writer); + readThreads[x].start(); + } + Thread.sleep(1000*90); // run for 1.5 minutes + for (int x=0; x < addThreads.length; x++) { + addThreads[x].run = false; + } + for (int x=0; x < readThreads.length; x++) { + readThreads[x].run = false; + } + for (int x=0; x < addThreads.length; x++) { + assertTrue(addThreads[x].ex == null); + } + for (int x=0; x < readThreads.length; x++) { + assertTrue(readThreads[x].ex == null); + } + writer.close(); + } + + private class ReadDocsThread extends Thread { + IndexWriter writer; + Exception ex; + boolean run = true; + String name; + + public ReadDocsThread(String name, IndexWriter writer) { + setName(name); + this.name = name; + this.writer = writer; + } + + public void run() { + try { + while (run) { + readDocs(); + if (ex != null) return; + Thread.sleep(1000); + } + } catch (Exception ex) { + ex.printStackTrace(); + } + } + + public void readDocs() { + try { + IndexReader reader = writer.getReader(); + // now load all the docs available + int maxDoc = reader.maxDoc(); + //System.out.println(name+" readDocs maxDoc:"+maxDoc); + for (int x=0; x < maxDoc; x++) { + if (!reader.isDeleted(x)) { + Document doc = reader.document(x); + } + } + reader.close(); + } catch (Exception ex) { + ex.printStackTrace(System.out); + this.ex = ex; + } + } + } + + private class AddDocumentsThread extends Thread { + IndexWriter writer; + Exception ex; + String name; + boolean run = true; + + public AddDocumentsThread(String name, IndexWriter writer) { + this.name = name; + this.writer = writer; + setName(name); + } + + public void run() { + try { + int x = 0; + while (run) { + Document d = TestIndexWriterReader.createDocument(x++, name, 10); + writer.addDocument(d); + if (x % 1000 == 0) { + writer.flush(); + //System.out.println(name+" addDoc:"+x); + } + if (x > 1000*80) return; + } + } catch (Exception ex) { + ex.printStackTrace(System.out); + this.ex = ex; + } + } + } + + /** + * Here we set an artificially low maxBufferSize. + * @throws Exception + */ + public void testRAMPolicy() throws Exception { + RAMPolicy.TRACE = true; + //Directory dir = createFSDirectory();// new MockRAMDirectory(); + Directory dir = new MockRAMDirectory(); + MockRAMDirectory ramDir = createRamDir(); + IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED, true, ramDir); + writer.setRAMBufferSizeMB(0.30); + for (int x = 0; x < 200; x++) { + Document d = TestIndexWriterReader.createDocument(x, "primary", 75); + if (TRACE) System.out.println("IW.addDocument " + x); + writer.addDocument(d); + } + writer.commit(); + writer.close(); + assertEquals(0, writer.getFSDSegmentInfos().size()); + assertEquals(0, ramDir.sizeInBytes()); + } + + private static MockRAMDirectory createRamDir() throws IOException { + Set prevExts = new HashSet(); + prevExts.add("cfs"); // prevent compound files + prevExts.add("cfx"); // prevent compound doc store files + MockRAMDirectory ramDir = new MockRAMDirectory(prevExts); + return ramDir; + } + + /** + * Test if IW.optimize merges all segments into 1 and moves them to disk. + * + * @throws Exception + */ + public void testOptimize() throws Exception { + Directory dir = new MockRAMDirectory(); + MockRAMDirectory ramDir = createRamDir(); + // have ramdir assert compound files aren't created + IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED, true, ramDir); + ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler(); + writer.setMergeScheduler(cms); + // create some segments + int rc = 0; + for (int x = 0; x < 9000; x++) { + Document d = TestIndexWriterReader.createDocument(x, "primary", 5); + writer.addDocument(d); + if (x % 500 == 0) { + writer.flush(); + rc++; + } + } + writer.commit(); // commit them to primaryDir + for (int x = 0; x < 4000; x++) { + Document d = TestIndexWriterReader.createDocument(x, "postcommit", 5); + writer.addDocument(d); + if (x % 500 == 0) + writer.flush(); + } + writer.optimize(); + cms.sync(); + IndexReader reader = writer.getReader(); + IndexReader[] readers = getAllReaders(reader); + assertEquals(1, readers.length); // insure we have only 1 segment on disk + SegmentInfos infos = writer.getSegmentInfos(); + assertEquals(1, infos.size()); + SegmentInfo info = infos.info(0); + assertEquals(dir, info.dir); // verify the directory is the primary + reader.close(); + writer.close(); + dir.close(); + } + + /** + * Test to insure expungeDeletes actually removes all segments with deleted + * docs when RAM NRT is on. + * + * @throws Exception + */ + public void testExpungeDeletes() throws Exception { + Directory dir = new MockRAMDirectory(); + IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED, true, createRamDir()); + ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler(); + writer.setMergeScheduler(cms); + //writer.getReader(); + // create some segments + for (int x = 0; x < 9000; x++) { + Document d = TestIndexWriterReader.createDocument(x, "primary", 5); + writer.addDocument(d); + if (x % 500 == 0) + writer.flush(); + } + // now delete some docs + for (int x=0; x < 2000; x++) { + if (x % 20 == 0) { + writer.deleteDocuments(new Term("id", Integer.toString(x))); + } + } + writer.commit(); // commit them to RAM + for (int x = 0; x < 9000; x++) { + Document d = TestIndexWriterReader.createDocument(x, "primary", 5); + writer.addDocument(d); + if (x % 500 == 0) + writer.flush(); + } + for (int x=7000; x < 9000; x++) { + if (x % 30 == 0) { + writer.deleteDocuments(new Term("id", Integer.toString(x))); + } + } + IndexReader r1 = writer.getReader(); + assertTrue(r1.numDeletedDocs() > 0); + r1.close(); + writer.expungeDeletes(); + cms.sync(); + IndexReader reader = writer.getReader(); + IndexReader[] readers = getAllReaders(reader); + for (int x = 0; x < readers.length; x++) { + int numDel = readers[x].numDeletedDocs(); + assertEquals(0, numDel); + } + reader.close(); + writer.close(); + dir.close(); + } + + static private final IndexReader[] indexReaderZeroArray = new IndexReader[0]; + + private static void gatherSubReaders(List allSubReaders, IndexReader r) { + IndexReader[] subReaders = r.getSequentialSubReaders(); + if (subReaders == null) { + // Add the reader itself, and do not recurse + allSubReaders.add(r); + } else { + for (int i = 0; i < subReaders.length; i++) { + gatherSubReaders(allSubReaders, subReaders[i]); + } + } + } + + private static IndexReader[] getAllReaders(IndexReader reader) { + List subReadersList = new ArrayList(); + gatherSubReaders(subReadersList, reader); + IndexReader[] sortedSubReaders = (IndexReader[]) subReadersList.toArray(indexReaderZeroArray); + return sortedSubReaders; + } + + public void testHasExternalSegments() throws IOException { + Directory dir = new MockRAMDirectory(); + IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED, true); + ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler(); + writer.setMergeScheduler(cms); + + FileSwitchDirectory fsd = (FileSwitchDirectory) writer.getFileSwitchDirectory(); + + for (int x = 0; x < 1000; x++) { + Document d = TestIndexWriterReader.createDocument(x, "prim", 5); + writer.addDocument(d); + } + writer.commit(); // commit to primary dir + for (int x = 0; x < 1000; x++) { + Document d = TestIndexWriterReader.createDocument(x, "ram", 5); + writer.addDocument(d); + } + writer.flush(); // flush to ram + + IndexReader reader = writer.getReader(); + IndexReader ramReader = writer.getRAMReader(); + assertEquals(1000, ramReader.maxDoc()); + IndexReader primaryReader = writer.getPrimaryReader(); + assertEquals(1000, primaryReader.maxDoc()); + + // we have ram and primary dir segments, make sure + // hasExternalSegments works for FileSwitchDirectory + boolean hes = writer.getSegmentInfos().hasExternalSegments(writer.getFileSwitchDirectory()); + assertFalse(hes); + reader.close(); + ramReader.close(); + primaryReader.close(); + writer.close(); + dir.close(); + } + + private FSDirectory createFSDirectory() throws IOException { + String tempDir = System.getProperty("java.io.tmpdir"); + if (tempDir == null) + throw new IOException("java.io.tmpdir undefined, cannot run test"); + File indexDir = new File(tempDir, "lucenetestindexwriterramdir"); + File[] files = indexDir.listFiles(); + if (files != null) { + for (int x = 0; x < files.length; x++) { + if (!files[x].isDirectory()) + files[x].delete(); + } + } + FSDirectory dir = FSDirectory.getDirectory(indexDir); + IndexWriter.unlock(dir); + return dir; + } + + public static void openWriterAddDocs(Directory dir, boolean commitClose) throws IOException { + IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED, true); + Set infoFiles = new HashSet(); + SegmentInfos infos = writer.getSegmentInfos(); + for (int x = 0; x < infos.size(); x++) { + SegmentInfo info = infos.info(x); + List files = info.files(); + for (int i = 0; i < files.size(); i++) { + String f = (String) files.get(i); + infoFiles.add(f); + } + } + // System.out.println("infos files:"+infoFiles); + // System.out.println("dirfiles:"+printFiles(dir)); + for (int x = 0; x < 1000; x++) { + Document d = TestIndexWriterReader.createDocument(x, "ram", 5); + writer.addDocument(d); + } + writer.flush(); + if (commitClose) { + writer.commit(); + writer.close(); + } + } + + /** + * Make sure after IndexWriter has created FileSwitchDirectory based .fdt + * files which are written to the primary directory, and IW cuts out before + * being closed and the ram segments flushed to the primary dir, a new IW (via + * IndexFileDeleter) cleans up the now unused .fdt files. + * + * IndexWriter is created, the + * + * @throws IOException + */ + public void testIFDDeletingAfterCrash() throws IOException { + Directory dir = new MockRAMDirectory(); + IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED, true); + ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler(); + writer.setMergeScheduler(cms); + + FileSwitchDirectory fsd = (FileSwitchDirectory) writer.getFileSwitchDirectory(); + + for (int x = 0; x < 1000; x++) { + Document d = TestIndexWriterReader.createDocument(x, "ram", 5); + writer.addDocument(d); + } + writer.flush(); + Set segmentNames = getSegmentNames(fsd.getSecondaryDir()); + List fdtFileNames = new ArrayList(); + Iterator segNameIterator = segmentNames.iterator(); + while (segNameIterator.hasNext()) { + String name = (String) segNameIterator.next(); + String fileName = name + ".fdt"; + if (dir.fileExists(fileName)) { + fdtFileNames.add(fileName); + } + } + + //System.out.println("fdtFileNames:" + fdtFileNames); + + IndexWriter.unlock(dir); + writer = new IndexWriter(dir, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED); + // now verify the fdt files are no longer in the dir + for (int x = 0; x < fdtFileNames.size(); x++) { + String name = (String) fdtFileNames.get(x); + assertFalse(dir.fileExists(name)); + } + writer.close(); + dir.close(); + } + + /** + * Get all the unique .tii files, get the segment names and return them + * + * @throws IOException + */ + public static Set getSegmentNames(Directory dir) throws IOException { + Set set = new HashSet(); + String[] files = dir.listAll(); + for (int x = 0; x < files.length; x++) { + if (files[x].endsWith(".tii")) { + String str = files[x].substring(0, files[x].indexOf('.')); + set.add(str); + } + } + return set; + } + + public void testFSDFilesInPrimaryDir() throws IOException { + Directory dir = new MockRAMDirectory(); + IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED, true); + // writer.setRAMBufferSizeMB(0.06); // 60 kilobytes should be exceeded + // quickly + ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler(); + writer.setMergeScheduler(cms); + + assertTrue(writer.getDirectory() != writer.getFileSwitchDirectory()); + FileSwitchDirectory fsd = (FileSwitchDirectory) writer.getFileSwitchDirectory(); + + LogDocMergePolicy mp = new LogDocMergePolicy(); + writer.setMergePolicy(mp); + + // create 10 segments with 500 documents each + for (int x = 0; x < 1000; x++) { + Document d = TestIndexWriterReader.createDocument(x, "ram", 5); + writer.addDocument(d); + } + writer.flush(); + writer.commit(); + + assertTrue(hasDirSegmentInfos(dir, writer.getSegmentInfos())); + IndexReader reader = writer.getReader(); + assertEquals(1000, reader.maxDoc()); + SegmentInfos infos = writer.getSegmentInfos(); + writer.close(); + reader.close(); + dir.close(); + } + + public static String printFiles(Directory dir) throws IOException { + String[] files = dir.listAll(); + List filesList = new ArrayList(); + for (int x = 0; x < files.length; x++) { + filesList.add(files[x]); + } + return filesList.toString(); + } + + /** + * Test IndexWriter performing in ram merges + * + * @throws IOException + */ + public void testRAMExceeded() throws IOException { + Directory dir = new MockRAMDirectory(); + IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED, true, createRamDir()); + writer.setRAMBufferSizeMB(0.06); // 60 kilobytes should be exceeded quickly + ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler(); + writer.setMergeScheduler(cms); + + assertTrue(writer.getDirectory() != writer.getFileSwitchDirectory()); + + LogDocMergePolicy mp = new LogDocMergePolicy(); + writer.setMergePolicy(mp); + + LogDocMergePolicy ramMp = new LogDocMergePolicy(); + ramMp.setMergeFactor(2); + writer.setRAMMergePolicy(ramMp); + + for (int x = 0; x < 1000; x++) { + Document d = TestIndexWriterReader.createDocument(x, "ram", 5); + writer.addDocument(d); + } + cms.sync(); + IndexReader reader = writer.getReader(); + assertEquals(1000, reader.maxDoc()); + SegmentInfos infos = writer.getSegmentInfos(); + writer.close(); + reader.close(); + + dir.close(); + } + + /** + * Test IndexWriter doing in ram merges + * + * @throws IOException + */ + public void testMergeInRamExceeded() throws IOException { + MockRAMDirectory dir = new MockRAMDirectory(); + MockRAMDirectory ramDir = createRamDir(); + IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED, true, ramDir); + writer.setRAMBufferSizeMB(0.2); // 100K for the ramdir and 100k for the ram + // buffer + // System.out.println("ramDirSizeMax:"+writer.docWriter.getRamDirSizeMax()); + ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler(); + writer.setMergeScheduler(cms); + + LogDocMergePolicy mp = new LogDocMergePolicy(); + writer.setMergePolicy(mp); + + LogDocMergePolicy ramMp = new LogDocMergePolicy(); + ramMp.setMergeFactor(2); + writer.setRAMMergePolicy(ramMp); + + // create 10 segments with 500 documents each + for (int x = 0; x < 5000; x++) { + Document d = TestIndexWriterReader.createDocument(x, "ram", 5); + writer.addDocument(d); + if (x % 500 == 0) { + writer.flush(); + } + } + SegmentInfos infos = writer.getSegmentInfos(); + assertTrue(writer.getFSDSegmentInfos().size() > 0); + IndexReader reader = writer.getReader(); + assertEquals(5000, reader.maxDoc()); + + // System.out.println("infos.size:"+infos.size()+" + // raminfos.size:"+writer.getRamDirSegmentInfos().size()); + reader.close(); + writer.close(); // when the reader is closed after the writer things are + // fine? + + dir.close(); + } + + /** + * Test IndexWriter doing in ram merges + * + * @throws IOException + */ + public void testMergeInRam() throws IOException { + Directory dir = new MockRAMDirectory(); + IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED, true, createRamDir()); + ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler(); + writer.setMergeScheduler(cms); + + LogDocMergePolicy mp = new LogDocMergePolicy(); + writer.setMergePolicy(mp); + + LogDocMergePolicy ramMp = new LogDocMergePolicy(); + ramMp.setMergeFactor(2); + writer.setRAMMergePolicy(ramMp); + + // create 10 segments with 500 documents each + for (int x = 0; x < 5000; x++) { + Document d = TestIndexWriterReader.createDocument(x, "ram", 5); + writer.addDocument(d); + if (x % 500 == 0) { + writer.flush(); + } + } + SegmentInfos infos = writer.getSegmentInfos(); + assertTrue(writer.getFSDSegmentInfos().size() > 0); + IndexReader reader = writer.getReader(); + assertEquals(5000, reader.maxDoc()); + reader.close(); + writer.close(); + dir.close(); + } + + public static class NoMergeScheduler extends MergeScheduler { + public void merge(IndexWriter writer) throws CorruptIndexException, IOException { + } + + public void close() { + } + } + + /** + * Make sure IW.commit flushes all ram segments to the directory + * + * @throws IOException + */ + public void testCommit() throws IOException { + Directory dir = new MockRAMDirectory(); + IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED, true, createRamDir()); + ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler(); + writer.setMergeScheduler(cms); + // writer.setMergeScheduler(new NoMergeScheduler()); + + LogDocMergePolicy mp = new LogDocMergePolicy(); + writer.setMergePolicy(mp); + + // create 10 segments with 500 documents each + for (int x = 0; x < 5000; x++) { + Document d = TestIndexWriterReader.createDocument(x, "ram", 5); + writer.addDocument(d); + if (x % 500 == 0) { + writer.flush(); + } + } + SegmentInfos infos = writer.getSegmentInfos(); + assertTrue(infos.size() == 10); + assertTrue(hasDirSegmentInfos(writer.getFileSwitchDirectory(), infos)); + writer.commit(); + // after commit, we shouldn't have any more FSD segmentInfos + SegmentInfos ramInfos = writer.getFSDSegmentInfos(); + assertEquals(0, ramInfos.size()); + + IndexReader reader = writer.getReader(); + assertEquals(5000, reader.maxDoc()); + + reader.close(); + writer.close(); + dir.close(); + } + + /** + * Test IndexWriter merging to disk + * + * @throws IOException + */ + // TODO: make this test work again + public void testMergeToPrimaryDir() throws IOException { + Directory dir = new MockRAMDirectory(); + IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED, true, createRamDir()); + ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler(); + writer.setMergeScheduler(cms); + + LogDocMergePolicy mp = new LogDocMergePolicy(); + writer.setMergePolicy(mp); + + // create 10 segments with 500 documents each + for (int x = 0; x < 5000; x++) { + Document d = TestIndexWriterReader.createDocument(x, "ram", 5); + writer.addDocument(d); + if (x % 500 == 0) { + writer.flush(); + } + } + SegmentInfos infos = writer.getSegmentInfos(); + assertTrue(infos.size() == 10); + assertTrue(hasDirSegmentInfos(writer.getFileSwitchDirectory(), infos)); + cms.sync(); // wait for the merges to complete + SegmentInfos ramInfos = writer.getFSDSegmentInfos(); + // make sure the number of segments decreased + assertTrue(ramInfos.size() < 3); + + IndexReader reader = writer.getReader(); + assertEquals(5000, reader.maxDoc()); + + reader.close(); + writer.close(); + dir.close(); + } + + private static boolean hasDirSegmentInfos(Directory dir, SegmentInfos infos) { + int dirSegs = 0; + for (int x = 0; x < infos.size(); x++) { + SegmentInfo info = infos.info(x); + if (info.dir == dir) { + dirSegs++; + } + } + return dirSegs > 0; + } + + /** + * Test the ram dir is working and that the reader returned from IndexWriter + * encompasses the in ram segments. + * + * @throws IOException + */ + public void testRAMDir() throws IOException { + Directory dir = new MockRAMDirectory(); + IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED, true, createRamDir()); + for (int x = 0; x < 100; x++) { + Document d = TestIndexWriterReader.createDocument(x, "ram", 5); + writer.addDocument(d); + } + writer.flush(false, false, true); + SegmentInfos infos = writer.getSegmentInfos(); + assertTrue(writer.getDirectory() != writer.getFileSwitchDirectory()); + boolean hasRamDirSegs = hasDirSegmentInfos(writer.getFileSwitchDirectory(), infos); + // System.out.println("ramDirSegs:"+ramDirSegs); + assertTrue(hasRamDirSegs); + IndexReader ramReader = writer.getReader(); + assertEquals(100, ramReader.maxDoc()); + writer.close(); + ramReader.close(); + dir.close(); + } +} Index: src/test/org/apache/lucene/index/TestIndexWriterReader.java =================================================================== --- src/test/org/apache/lucene/index/TestIndexWriterReader.java (revision 789908) +++ src/test/org/apache/lucene/index/TestIndexWriterReader.java (working copy) @@ -590,6 +590,7 @@ for (int i = 0; i < 10; i++) { writer.addDocument(createDocument(i, "test", 4)); } + writer.commit(); ((ConcurrentMergeScheduler) writer.getMergeScheduler()).sync(); assertTrue(warmer.warmCount > 0); Index: src/test/org/apache/lucene/index/TestStressIndexing.java =================================================================== --- src/test/org/apache/lucene/index/TestStressIndexing.java (revision 789908) +++ src/test/org/apache/lucene/index/TestStressIndexing.java (working copy) @@ -115,9 +115,9 @@ Run one indexer and 2 searchers against single index as stress test. */ - public void runStressTest(Directory directory, boolean autoCommit, MergeScheduler mergeScheduler) throws Exception { - IndexWriter modifier = new IndexWriter(directory, autoCommit, ANALYZER, true); - + public void runStressTest(Directory dir, boolean autoCommit, MergeScheduler mergeScheduler) throws Exception { + IndexWriter modifier = new IndexWriter(dir, autoCommit, ANALYZER, true); + Directory directory = modifier.getFileSwitchDirectory(); modifier.setMaxBufferedDocs(10); TimedThread[] threads = new TimedThread[4]; Index: src/test/org/apache/lucene/index/TestTermVectorsReader.java =================================================================== --- src/test/org/apache/lucene/index/TestTermVectorsReader.java (revision 789908) +++ src/test/org/apache/lucene/index/TestTermVectorsReader.java (working copy) @@ -32,6 +32,7 @@ import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.store.MockRAMDirectory; +import org.apache.lucene.store.Directory; import org.apache.lucene.util.LuceneTestCase; public class TestTermVectorsReader extends LuceneTestCase { @@ -42,7 +43,8 @@ private String[] testTerms = {"this", "is", "a", "test"}; private int[][] positions = new int[testTerms.length][]; private TermVectorOffsetInfo[][] offsets = new TermVectorOffsetInfo[testTerms.length][]; - private MockRAMDirectory dir = new MockRAMDirectory(); + private MockRAMDirectory primaryDir = new MockRAMDirectory(); + private Directory dir; private String seg; private FieldInfos fieldInfos = new FieldInfos(); private static int TERM_FREQ = 3; @@ -90,8 +92,9 @@ } } Arrays.sort(tokens); - - IndexWriter writer = new IndexWriter(dir, new MyAnalyzer(), true, IndexWriter.MaxFieldLength.LIMITED); + + IndexWriter writer = new IndexWriter(primaryDir, new MyAnalyzer(), true, IndexWriter.MaxFieldLength.LIMITED); + dir = writer.getFileSwitchDirectory(); writer.setUseCompoundFile(false); Document doc = new Document(); for(int i=0;i