Index: src/test/org/apache/lucene/store/MockRAMInputStream.java =================================================================== --- src/test/org/apache/lucene/store/MockRAMInputStream.java (revision 603331) +++ src/test/org/apache/lucene/store/MockRAMInputStream.java (working copy) @@ -45,11 +45,14 @@ if (!isClone) { synchronized(dir.openFiles) { Integer v = (Integer) dir.openFiles.get(name); - if (v.intValue() == 1) { - dir.openFiles.remove(name); - } else { - v = new Integer(v.intValue()-1); - dir.openFiles.put(name, v); + // Could be null when MockRAMDirectory.crash() was called + if (v != null) { + if (v.intValue() == 1) { + dir.openFiles.remove(name); + } else { + v = new Integer(v.intValue()-1); + dir.openFiles.put(name, v); + } } } } Index: src/test/org/apache/lucene/store/MockRAMOutputStream.java =================================================================== --- src/test/org/apache/lucene/store/MockRAMOutputStream.java (revision 603331) +++ src/test/org/apache/lucene/store/MockRAMOutputStream.java (working copy) @@ -30,7 +30,7 @@ public class MockRAMOutputStream extends RAMOutputStream { private MockRAMDirectory dir; private boolean first=true; - + byte[] singleByte = new byte[1]; /** Construct an empty output buffer. */ @@ -59,6 +59,11 @@ long freeSpace = dir.maxSize - dir.sizeInBytes(); long realUsage = 0; + // If MockRAMDir crashed since we were opened, then + // don't write anything: + if (dir.crashed) + throw new IOException("MockRAMDirectory was crashed"); + // Enforce disk full: if (dir.maxSize != 0 && freeSpace <= len) { // Compute the real disk free. This will greatly slow Index: src/test/org/apache/lucene/store/MockRAMDirectory.java =================================================================== --- src/test/org/apache/lucene/store/MockRAMDirectory.java (revision 603331) +++ src/test/org/apache/lucene/store/MockRAMDirectory.java (working copy) @@ -24,7 +24,10 @@ import java.util.Random; import java.util.Map; import java.util.HashMap; +import java.util.HashSet; +import java.util.Set; import java.util.ArrayList; +import java.util.Arrays; /** * This is a subclass of RAMDirectory that adds methods @@ -40,6 +43,10 @@ double randomIOExceptionRate; Random randomState; boolean noDeleteOpenFile = true; + boolean preventDoubleWrite = true; + private Set unSyncedFiles; + private Set createdFiles; + volatile boolean crashed; // NOTE: we cannot initialize the Map here due to the // order in which our constructor actually does this @@ -47,31 +54,79 @@ // like super is called, then our members are initialized: Map openFiles; + private void init() { + if (openFiles == null) + openFiles = new HashMap(); + if (createdFiles == null) + createdFiles = new HashSet(); + if (unSyncedFiles == null) + unSyncedFiles = new HashSet(); + } + public MockRAMDirectory() { super(); - if (openFiles == null) { - openFiles = new HashMap(); - } + init(); } public MockRAMDirectory(String dir) throws IOException { super(dir); - if (openFiles == null) { - openFiles = new HashMap(); - } + init(); } public MockRAMDirectory(Directory dir) throws IOException { super(dir); - if (openFiles == null) { - openFiles = new HashMap(); - } + init(); } public MockRAMDirectory(File dir) throws IOException { super(dir); - if (openFiles == null) { + init(); + } + + /** If set to true, we throw an IOException if the same + * file is opened by createOutput, ever. */ + public void setPreventDoubleWrite(boolean value) { + preventDoubleWrite = value; + } + + public synchronized void sync(String name) throws IOException { + if (crashed) + throw new IOException("cannot sync after crash"); + if (unSyncedFiles.contains(name)) + unSyncedFiles.remove(name); + } + + /** Simulates a crash of OS or machine by overwriting + * unsycned files. */ + public void crash() throws IOException { + synchronized(this) { + crashed = true; openFiles = new HashMap(); } + Iterator it = unSyncedFiles.iterator(); + unSyncedFiles = new HashSet(); + int count = 0; + while(it.hasNext()) { + String name = (String) it.next(); + RAMFile file = (RAMFile) fileMap.get(name); + if (count % 3 == 0) { + deleteFile(name, true); + } else if (count % 3 == 1) { + // Zero out file entirely + final int numBuffers = file.numBuffers(); + for(int i=0;i= 157); + } + + public void testCrashAfterClose() throws IOException { + + IndexWriter writer = initIndex(); + MockRAMDirectory dir = (MockRAMDirectory) writer.getDirectory(); + + writer.close(); + dir.crash(); + + /* + String[] l = dir.list(); + Arrays.sort(l); + for(int i=0;i lastGen); - lastGen = gen; + assertTrue(flushCount > lastFlushCount); + lastFlushCount = flushCount; writer.setRAMBufferSizeMB(0.000001); writer.setMaxBufferedDocs(IndexWriter.DISABLE_AUTO_FLUSH); } else if (j < 20) { - assertTrue(gen > lastGen); - lastGen = gen; + assertTrue(flushCount > lastFlushCount); + lastFlushCount = flushCount; } else if (20 == j) { writer.setRAMBufferSizeMB(16); writer.setMaxBufferedDocs(IndexWriter.DISABLE_AUTO_FLUSH); - lastGen = gen; + lastFlushCount = flushCount; } else if (j < 30) { - assertEquals(gen, lastGen); + assertEquals(flushCount, lastFlushCount); } else if (30 == j) { writer.setRAMBufferSizeMB(0.000001); writer.setMaxBufferedDocs(IndexWriter.DISABLE_AUTO_FLUSH); } else if (j < 40) { - assertTrue(gen> lastGen); - lastGen = gen; + assertTrue(flushCount> lastFlushCount); + lastFlushCount = flushCount; } else if (40 == j) { writer.setMaxBufferedDocs(10); writer.setRAMBufferSizeMB(IndexWriter.DISABLE_AUTO_FLUSH); - lastGen = gen; + lastFlushCount = flushCount; } else if (j < 50) { - assertEquals(gen, lastGen); + assertEquals(flushCount, lastFlushCount); writer.setMaxBufferedDocs(10); writer.setRAMBufferSizeMB(IndexWriter.DISABLE_AUTO_FLUSH); } else if (50 == j) { - assertTrue(gen > lastGen); + assertTrue(flushCount > lastFlushCount); } } writer.close(); @@ -1320,46 +1319,46 @@ writer.addDocument(doc); } - long lastGen = -1; + int lastFlushCount = -1; for(int j=1;j<52;j++) { writer.deleteDocuments(new Term("field", "aaa" + j)); _TestUtil.syncConcurrentMerges(writer); - long gen = SegmentInfos.generationFromSegmentsFileName(SegmentInfos.getCurrentSegmentFileName(dir.list())); + int flushCount = writer.getFlushCount(); if (j == 1) - lastGen = gen; + lastFlushCount = flushCount; else if (j < 10) { // No new files should be created - assertEquals(gen, lastGen); + assertEquals(flushCount, lastFlushCount); } else if (10 == j) { - assertTrue(gen > lastGen); - lastGen = gen; + assertTrue(flushCount > lastFlushCount); + lastFlushCount = flushCount; writer.setRAMBufferSizeMB(0.000001); writer.setMaxBufferedDeleteTerms(IndexWriter.DISABLE_AUTO_FLUSH); } else if (j < 20) { - assertTrue(gen > lastGen); - lastGen = gen; + assertTrue(flushCount > lastFlushCount); + lastFlushCount = flushCount; } else if (20 == j) { writer.setRAMBufferSizeMB(16); writer.setMaxBufferedDeleteTerms(IndexWriter.DISABLE_AUTO_FLUSH); - lastGen = gen; + lastFlushCount = flushCount; } else if (j < 30) { - assertEquals(gen, lastGen); + assertEquals(flushCount, lastFlushCount); } else if (30 == j) { writer.setRAMBufferSizeMB(0.000001); writer.setMaxBufferedDeleteTerms(IndexWriter.DISABLE_AUTO_FLUSH); } else if (j < 40) { - assertTrue(gen> lastGen); - lastGen = gen; + assertTrue(flushCount> lastFlushCount); + lastFlushCount = flushCount; } else if (40 == j) { writer.setMaxBufferedDeleteTerms(10); writer.setRAMBufferSizeMB(IndexWriter.DISABLE_AUTO_FLUSH); - lastGen = gen; + lastFlushCount = flushCount; } else if (j < 50) { - assertEquals(gen, lastGen); + assertEquals(flushCount, lastFlushCount); writer.setMaxBufferedDeleteTerms(10); writer.setRAMBufferSizeMB(IndexWriter.DISABLE_AUTO_FLUSH); } else if (50 == j) { - assertTrue(gen > lastGen); + assertTrue(flushCount > lastFlushCount); } } writer.close(); @@ -1807,12 +1806,19 @@ public void eval(MockRAMDirectory dir) throws IOException { if (doFail) { StackTraceElement[] trace = new Exception().getStackTrace(); + boolean sawAppend = false; + boolean sawFlush = false; for (int i = 0; i < trace.length; i++) { - if ("appendPostings".equals(trace[i].getMethodName()) && count++ == 30) { - doFail = false; - throw new IOException("now failing during flush"); - } + if ("appendPostings".equals(trace[i].getMethodName())) + sawAppend = true; + if ("doFlush".equals(trace[i].getMethodName())) + sawFlush = true; } + + if (sawAppend && sawFlush && count++ >= 30) { + doFail = false; + throw new IOException("now failing during flush"); + } } } } Index: src/test/org/apache/lucene/index/TestStressIndexing.java =================================================================== --- src/test/org/apache/lucene/index/TestStressIndexing.java (revision 603331) +++ src/test/org/apache/lucene/index/TestStressIndexing.java (working copy) @@ -123,6 +123,7 @@ modifier.setMaxBufferedDocs(10); TimedThread[] threads = new TimedThread[4]; + int numThread = 0; if (mergeScheduler != null) modifier.setMergeScheduler(mergeScheduler); @@ -130,34 +131,30 @@ // One modifier that writes 10 docs then removes 5, over // and over: IndexerThread indexerThread = new IndexerThread(modifier, threads); - threads[0] = indexerThread; + threads[numThread++] = indexerThread; indexerThread.start(); - + IndexerThread indexerThread2 = new IndexerThread(modifier, threads); - threads[2] = indexerThread2; + threads[numThread++] = indexerThread2; indexerThread2.start(); // Two searchers that constantly just re-instantiate the // searcher: SearcherThread searcherThread1 = new SearcherThread(directory, threads); - threads[3] = searcherThread1; + threads[numThread++] = searcherThread1; searcherThread1.start(); SearcherThread searcherThread2 = new SearcherThread(directory, threads); - threads[3] = searcherThread2; + threads[numThread++] = searcherThread2; searcherThread2.start(); - indexerThread.join(); - indexerThread2.join(); - searcherThread1.join(); - searcherThread2.join(); + for(int i=0;i 2); - } else { + if (!autoCommit) // If we are not auto committing then there should // be exactly 2 commits (one per close above): assertEquals(2, policy.numOnCommit); - } // Simplistic check: just verify all segments_N's still // exist, and, I can open a reader on each: @@ -334,13 +331,10 @@ writer.close(); assertEquals(2, policy.numOnInit); - if (autoCommit) { - assertTrue(policy.numOnCommit > 2); - } else { + if (!autoCommit) // If we are not auto committing then there should // be exactly 2 commits (one per close above): assertEquals(2, policy.numOnCommit); - } // Simplistic check: just verify the index is in fact // readable: @@ -459,11 +453,8 @@ writer.close(); assertEquals(2*(N+2), policy.numOnInit); - if (autoCommit) { - assertTrue(policy.numOnCommit > 2*(N+2)-1); - } else { + if (!autoCommit) assertEquals(2*(N+2)-1, policy.numOnCommit); - } IndexSearcher searcher = new IndexSearcher(dir); Hits hits = searcher.search(query); @@ -565,11 +556,8 @@ } assertEquals(1+3*(N+1), policy.numOnInit); - if (autoCommit) { - assertTrue(policy.numOnCommit > 3*(N+1)-1); - } else { + if (!autoCommit) assertEquals(2*(N+1), policy.numOnCommit); - } IndexSearcher searcher = new IndexSearcher(dir); Hits hits = searcher.search(query); Index: src/test/org/apache/lucene/index/TestBackwardsCompatibility.java =================================================================== --- src/test/org/apache/lucene/index/TestBackwardsCompatibility.java (revision 603331) +++ src/test/org/apache/lucene/index/TestBackwardsCompatibility.java (working copy) @@ -317,7 +317,6 @@ IndexWriter writer = new IndexWriter(dir, autoCommit, new WhitespaceAnalyzer(), true); writer.setRAMBufferSizeMB(16.0); - //IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true); for(int i=0;i<35;i++) { addDoc(writer, i); } @@ -358,12 +357,9 @@ expected = new String[] {"_0.cfs", "_0_1.del", "_0_1.s" + contentFieldIndex, - "segments_4", + "segments_3", "segments.gen"}; - if (!autoCommit) - expected[3] = "segments_3"; - String[] actual = dir.list(); Arrays.sort(expected); Arrays.sort(actual); Index: src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java =================================================================== --- src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java (revision 603331) +++ src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java (working copy) @@ -267,6 +267,10 @@ exceptions.add(exc); } + // nocommit + System.out.println("\nEXC DURING MERGE"); + exc.printStackTrace(System.out); + if (!suppressExceptions) { // suppressExceptions is normally only set during // testing. @@ -276,8 +280,8 @@ } } finally { synchronized(ConcurrentMergeScheduler.this) { + ConcurrentMergeScheduler.this.notifyAll(); mergeThreads.remove(this); - ConcurrentMergeScheduler.this.notifyAll(); } } } Index: src/java/org/apache/lucene/index/SegmentInfos.java =================================================================== --- src/java/org/apache/lucene/index/SegmentInfos.java (revision 603332) +++ src/java/org/apache/lucene/index/SegmentInfos.java (working copy) @@ -620,7 +620,7 @@ retry = true; } - } else { + } else if (0 == method) { // Segment file has advanced since our last loop, so // reset retry: retry = false; @@ -701,4 +701,11 @@ infos.addAll(super.subList(first, last)); return infos; } + + // nocommit javadoc + void updateGeneration(SegmentInfos other) { + assert other.generation > generation; + lastGeneration = other.lastGeneration; + generation = other.generation; + } } Index: src/java/org/apache/lucene/index/IndexWriter.java =================================================================== --- src/java/org/apache/lucene/index/IndexWriter.java (revision 603331) +++ src/java/org/apache/lucene/index/IndexWriter.java (working copy) @@ -260,7 +260,7 @@ private Similarity similarity = Similarity.getDefault(); // how to normalize - private boolean commitPending; // true if segmentInfos has changes not yet committed + private volatile boolean commitPending; // true if segmentInfos has changes not yet committed private SegmentInfos rollbackSegmentInfos; // segmentInfos we will fallback to if the commit fails private SegmentInfos localRollbackSegmentInfos; // segmentInfos we will fallback to if the commit fails @@ -268,6 +268,9 @@ private boolean autoCommit = true; // false if we should commit only on close private SegmentInfos segmentInfos = new SegmentInfos(); // the segments + private int syncCount; + private int syncCountSaved = -1; + private DocumentsWriter docWriter; private IndexFileDeleter deleter; @@ -292,6 +295,9 @@ private List mergeExceptions = new ArrayList(); private long mergeGen; + private int flushCount; + private SegmentInfo lastMergeInfo; + /** * Used internally to throw an {@link * AlreadyClosedException} if this IndexWriter has been @@ -678,6 +684,8 @@ directory.clearLock(IndexWriter.WRITE_LOCK_NAME); } + this.autoCommit = autoCommit; + Lock writeLock = directory.makeLock(IndexWriter.WRITE_LOCK_NAME); if (!writeLock.obtain(writeLockTimeout)) // obtain write lock throw new LockObtainFailedException("Index locked for write: " + writeLock); @@ -695,12 +703,21 @@ } catch (IOException e) { // Likely this means it's a fresh directory } - segmentInfos.write(directory); + commitPending = true; + sync(false, false); } else { segmentInfos.read(directory); + + // We assume that this segments_N was previously + // properly sync'd: + for(int i=0;i 0 || runningMerges.size() > 0) { try { @@ -2041,19 +2049,11 @@ /* * Called whenever the SegmentInfos has been updated and * the index files referenced exist (correctly) in the - * index directory. If we are in autoCommit mode, we - * commit the change immediately. Else, we mark - * commitPending. + * index directory. */ private synchronized void checkpoint() throws IOException { - if (autoCommit) { - segmentInfos.write(directory); - commitPending = false; - if (infoStream != null) - message("checkpoint: wrote segments file \"" + segmentInfos.getCurrentSegmentFileName() + "\""); - } else { - commitPending = true; - } + commitPending = true; + deleter.checkpoint(segmentInfos, false); } /** Merges all segments from an array of indexes into this index. @@ -2114,7 +2114,7 @@ ensureOpen(); if (infoStream != null) message("flush at addIndexes"); - flush(); + flush(true, false); boolean success = false; @@ -2176,7 +2176,7 @@ ensureOpen(); if (infoStream != null) message("flush at addIndexesNoOptimize"); - flush(); + flush(true, false); boolean success = false; @@ -2348,12 +2348,26 @@ * not be visible to readers, until {@link #close} is called. * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error + * @deprecated please call commmit() instead */ public final void flush() throws CorruptIndexException, IOException { flush(true, false); } /** + * Commits & syncs all pending updates (added & deleted + * documents) to the index. + */ + public final void commit() throws CorruptIndexException, IOException { + commit(true); + } + + private final void commit(boolean triggerMerges) throws CorruptIndexException, IOException { + flush(triggerMerges, true); + sync(false, true); + } + + /** * Flush all in-memory buffered udpates (adds and deletes) * to the Directory. * @param triggerMerge if true, we may merge segments (if @@ -2368,11 +2382,16 @@ maybeMerge(); } + // TODO: this method should not have to be entirely + // synchronized, ie, merges should be allowed to commit + // even while a flush is happening private synchronized final boolean doFlush(boolean flushDocStores) throws CorruptIndexException, IOException { // Make sure no threads are actively adding a document docWriter.pauseAllThreads(); + flushCount++; + try { SegmentInfo newSegment = null; @@ -2430,121 +2449,103 @@ // If we are flushing docs, segment must not be null: assert segment != null || !flushDocs; - if (flushDocs || flushDeletes) { + if (flushDocs) { - SegmentInfos rollback = null; - - if (flushDeletes) - rollback = (SegmentInfos) segmentInfos.clone(); - boolean success = false; + final int flushedDocCount; try { - if (flushDocs) { - - if (0 == docStoreOffset && flushDocStores) { - // This means we are flushing private doc stores - // with this segment, so it will not be shared - // with other segments - assert docStoreSegment != null; - assert docStoreSegment.equals(segment); - docStoreOffset = -1; - docStoreIsCompoundFile = false; - docStoreSegment = null; - } - - int flushedDocCount = docWriter.flush(flushDocStores); - - newSegment = new SegmentInfo(segment, - flushedDocCount, - directory, false, true, - docStoreOffset, docStoreSegment, - docStoreIsCompoundFile); - segmentInfos.addElement(newSegment); - } - - if (flushDeletes) { - // we should be able to change this so we can - // buffer deletes longer and then flush them to - // multiple flushed segments, when - // autoCommit=false - int delCount = applyDeletes(flushDocs); - if (infoStream != null) - infoStream.println("flushed " + delCount + " deleted documents"); - doAfterFlush(); - } - - checkpoint(); + flushedDocCount = docWriter.flush(flushDocStores); success = true; } finally { if (!success) { - if (infoStream != null) message("hit exception flushing segment " + segment); - - if (flushDeletes) { - - // Carefully check if any partial .del files - // should be removed: - final int size = rollback.size(); - for(int i=0;i 0 && - segmentInfos.info(segmentInfos.size()-1) == newSegment) - segmentInfos.remove(segmentInfos.size()-1); - } - if (flushDocs) - docWriter.abort(); - deletePartialSegmentsFile(); - deleter.checkpoint(segmentInfos, false); - - if (segment != null) - deleter.refresh(segment); + docWriter.abort(); + deleter.refresh(segment); } } + + if (0 == docStoreOffset && flushDocStores) { + // This means we are flushing private doc stores + // with this segment, so it will not be shared + // with other segments + assert docStoreSegment != null; + assert docStoreSegment.equals(segment); + docStoreOffset = -1; + docStoreIsCompoundFile = false; + docStoreSegment = null; + } - deleter.checkpoint(segmentInfos, autoCommit); + newSegment = new SegmentInfo(segment, + flushedDocCount, + directory, false, true, + docStoreOffset, docStoreSegment, + docStoreIsCompoundFile); + segmentInfos.addElement(newSegment); + checkpoint(); - if (flushDocs && mergePolicy.useCompoundFile(segmentInfos, - newSegment)) { + if (mergePolicy.useCompoundFile(segmentInfos, newSegment)) { success = false; try { docWriter.createCompoundFile(segment); - newSegment.setUseCompoundFile(true); - checkpoint(); success = true; } finally { if (!success) { if (infoStream != null) message("hit exception creating compound file for newly flushed segment " + segment); - newSegment.setUseCompoundFile(false); deleter.deleteFile(segment + "." + IndexFileNames.COMPOUND_FILE_EXTENSION); - deletePartialSegmentsFile(); } } - deleter.checkpoint(segmentInfos, autoCommit); + newSegment.setUseCompoundFile(true); + checkpoint(); } - - return true; - } else { - return false; } + if (flushDeletes) { + SegmentInfos rollback = (SegmentInfos) segmentInfos.clone(); + + boolean success = false; + try { + // we should be able to change this so we can + // buffer deletes longer and then flush them to + // multiple flushed segments only when a commit() + // finally happens + int delCount = applyDeletes(flushDocs); + if (infoStream != null) + infoStream.println("flushed " + delCount + " deleted documents"); + success = true; + } finally { + if (!success) { + if (infoStream != null) + message("hit exception flushing deletes"); + + // Carefully check if any partial .del files + // should be removed: + final int size = rollback.size(); + for(int i=0;i docCount()) + return true; + else + return false; + } + /** Does fininishing for a merge, which is fast but holds * the synchronized lock on IndexWriter instance. */ final synchronized void mergeFinish(MergePolicy.OneMerge merge) throws IOException { @@ -3026,11 +3030,10 @@ merger = new SegmentMerger(this, mergedName); + boolean success = false; + // This is try/finally to make sure merger's readers are // closed: - - boolean success = false; - try { int totDocCount = 0; @@ -3047,6 +3050,7 @@ if (merge.isAborted()) throw new IOException("merge is aborted"); + // This is where all the work happens: mergedDocCount = merge.info.docCount = merger.merge(merge.mergeDocStores); assert mergedDocCount == totDocCount; @@ -3059,14 +3063,6 @@ if (merger != null) { merger.closeReaders(); } - if (!success) { - if (infoStream != null) - message("hit exception during merge; now refresh deleter on segment " + mergedName); - synchronized(this) { - addMergeException(merge); - deleter.refresh(mergedName); - } - } } if (!commitMerge(merge)) @@ -3074,6 +3070,11 @@ return 0; if (merge.useCompoundFile) { + + // Force a sync here to allow reclaiming of the disk + // space used by the segments we just merged: + if (autoCommit && doSyncBeforeMergeCFS(mergedDocCount)) + sync(false, false); success = false; boolean skip = false; @@ -3106,7 +3107,7 @@ } finally { if (!success) { if (infoStream != null) - message("hit exception creating compound file during merge: skip=" + skip); + message("hit exception creating compound file during merge"); synchronized(this) { if (!skip) @@ -3116,40 +3117,37 @@ } } + if (merge.isAborted()) { + if (infoStream != null) + message("abort merge after building CFS"); + deleter.deleteFile(compoundFileName); + return 0; + } + if (!skip) { synchronized(this) { - if (skip || segmentInfos.indexOf(merge.info) == -1 || merge.isAborted()) { + if (segmentInfos.indexOf(merge.info) == -1 || merge.isAborted()) { // Our segment (committed in non-compound // format) got merged away while we were // building the compound format. deleter.deleteFile(compoundFileName); } else { - success = false; - try { - merge.info.setUseCompoundFile(true); - checkpoint(); - success = true; - } finally { - if (!success) { - if (infoStream != null) - message("hit exception checkpointing compound file during merge"); - - // Must rollback: - addMergeException(merge); - merge.info.setUseCompoundFile(false); - deletePartialSegmentsFile(); - deleter.deleteFile(compoundFileName); - } - } - - // Give deleter a chance to remove files now. - deleter.checkpoint(segmentInfos, autoCommit); + merge.info.setUseCompoundFile(true); + checkpoint(); } } } } + // We force a sync after commiting the merge. Once this + // sync completes then all index files referenced by the + // current segmentInfos are on stable storage so if the + // OS/machine crashes, or power cord is yanked, the + // index will be intact: + if (autoCommit) + sync(false, false); + return mergedDocCount; } @@ -3158,11 +3156,11 @@ mergeExceptions.add(merge); } - private void deletePartialSegmentsFile() throws IOException { - if (segmentInfos.getLastGeneration() != segmentInfos.getGeneration()) { + private void deletePartialSegmentsFile(SegmentInfos infos) throws IOException { + if (infos.getLastGeneration() != infos.getGeneration()) { String segmentFileName = IndexFileNames.fileNameFromGeneration(IndexFileNames.SEGMENTS, "", - segmentInfos.getGeneration()); + infos.getGeneration()); if (infoStream != null) message("now delete partial segments file \"" + segmentFileName + "\""); @@ -3307,4 +3305,218 @@ return buffer.toString(); } + + // nocommit -- pull all of this sync logic into separate + // IndexSyncer class?? + private HashSet synced = new HashSet(); + private HashSet syncing = new HashSet(); + + private boolean startSync(String fileName) { + synchronized(synced) { + if (!synced.contains(fileName) && !syncing.contains(fileName)) { + syncing.add(fileName); + return true; + } else + return false; + } + } + + private void finishSync(String fileName) { + synchronized(synced) { + assert syncing.contains(fileName); + syncing.remove(fileName); + synced.add(fileName); + synced.notifyAll(); + } + } + + private void waitForAllSynced(final SegmentInfos infos) throws IOException { + synchronized(synced) { + for(int i=0;i 0) + lastMergeInfo = toSync.info(toSync.size()-1); + + mySyncCount = syncCount++; + if (deleter != null) + deleter.incRef(toSync, false); + assert syncingCount >= 0; + syncingCount++; + } + + boolean success0 = false; + + try { + + for(int i=0;i syncCountSaved) { + + if (segmentInfos.getGeneration() > toSync.getGeneration()) + toSync.updateGeneration(segmentInfos); + + boolean success = false; + try { + toSync.write(directory); + success = true; + } finally { + if (!success) { + commitPending = true; + segmentInfos.updateGeneration(toSync); + deletePartialSegmentsFile(toSync); + } + } + + // nocommit: i think we must add checksum to end + // of segments_N file. if we crash right here, + // that file could be corrupt. likely we would + // detect such corruption, but not for sure. with + // checksum we can be much more sure. + + // OK, we wrote file successfully, but sync could + // still fail and we have to back out the commit if so: + success = false; + String segmentsFileName = null; + try { + segmentsFileName = toSync.getCurrentSegmentFileName(); + if (infoStream != null) + message("now sync segments file " + segmentsFileName); + directory.sync(segmentsFileName); + success = true; + } finally { + if (!success && segmentsFileName != null && deleter != null) { + commitPending = true; + segmentInfos.updateGeneration(toSync); + deleter.deleteFile(segmentsFileName); + } + } + + syncCountSaved = mySyncCount; + + segmentInfos.updateGeneration(toSync); + + if (deleter != null) + deleter.checkpoint(toSync, true); + } + } + + if (infoStream != null) + message("done all syncs"); + + success0 = true; + + } finally { + synchronized(this) { + if (deleter != null) + deleter.decRef(toSync); + syncingCount--; + assert syncingCount >= 0; + if (0 == syncingCount) + notifyAll(); + if (!success0) + commitPending = true; + } + } + } } Index: src/java/org/apache/lucene/index/IndexFileDeleter.java =================================================================== --- src/java/org/apache/lucene/index/IndexFileDeleter.java (revision 603331) +++ src/java/org/apache/lucene/index/IndexFileDeleter.java (working copy) @@ -32,13 +32,20 @@ /* * This class keeps track of each SegmentInfos instance that - * is still "live", either because it corresponds to a - * segments_N file in the Directory (a "commit", i.e. a - * committed SegmentInfos) or because it's the in-memory SegmentInfos - * that a writer is actively updating but has not yet committed - * (currently this only applies when autoCommit=false in IndexWriter). - * This class uses simple reference counting to map the live - * SegmentInfos instances to individual files in the Directory. + * is still "live", either because it corresponds to a + * segments_N file in the Directory (a "commit", i.e. a + * committed SegmentInfos) or because it's the in-memory + * SegmentInfos that a writer is actively updating but has + * not yet committedThis class uses simple reference + * counting to map the live SegmentInfos instances to + * individual files in the Directory. + * + * When autoCommit=true, IndexWriter currently commits only + * on completion of a merge (though this may change with + * time: it is not a guarantee). When autoCommit=false, + * IndexWriter only commits when it is closed. Regardless + * of autoCommit, the user may call IndexWriter.commit() to + * force a blocking commit. * * The same directory file may be referenced by more than * one IndexCommitPoints, i.e. more than one SegmentInfos. @@ -260,7 +267,7 @@ for(int i=0;i 0) { - for(int i=0;i 0) { + for(int i=0;i