Index: src/test/org/apache/lucene/index/TestIndexWriter.java =================================================================== --- src/test/org/apache/lucene/index/TestIndexWriter.java (revision 682643) +++ src/test/org/apache/lucene/index/TestIndexWriter.java (working copy) @@ -22,6 +22,7 @@ import java.io.File; import java.util.Arrays; import java.util.ArrayList; +import java.util.List; import java.util.Random; import org.apache.lucene.util.LuceneTestCase; @@ -125,7 +126,7 @@ writer.close(); } - private void addDoc(IndexWriter writer) throws IOException + private static void addDoc(IndexWriter writer) throws IOException { Document doc = new Document(); doc.add(new Field("content", "aaa", Field.Store.NO, Field.Index.TOKENIZED)); @@ -2338,6 +2339,9 @@ IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED); ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler(); + // We expect AlreadyClosedException + cms.setSuppressExceptions(); + writer.setMergeScheduler(cms); writer.setMaxBufferedDocs(10); writer.setMergeFactor(4); @@ -2891,7 +2895,7 @@ writer.setMergeScheduler(new SerialMergeScheduler()); writer.setMergePolicy(new LogDocMergePolicy()); - Directory[] indexDirs = { dir}; + Directory[] indexDirs = {new MockRAMDirectory(dir)}; writer.addIndexes(indexDirs); writer.close(); } @@ -3732,6 +3736,278 @@ dir.close(); } + private abstract static class RunAddIndexesThreads { + + Directory dir, dir2; + final static int NUM_INIT_DOCS = 17; + IndexWriter writer2; + final List failures = new ArrayList(); + volatile boolean didClose; + final IndexReader[] readers; + final int NUM_COPY; + final static int NUM_THREADS = 5; + final Thread[] threads = new Thread[NUM_THREADS]; + final ConcurrentMergeScheduler cms; + + public RunAddIndexesThreads(int numCopy) throws Throwable { + NUM_COPY = numCopy; + dir = new MockRAMDirectory(); + IndexWriter writer = new IndexWriter(dir, false, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED); + writer.setMaxBufferedDocs(2); + for (int i = 0; i < NUM_INIT_DOCS; i++) + addDoc(writer); + writer.close(); + + dir2 = new MockRAMDirectory(); + writer2 = new IndexWriter(dir2, false, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED); + cms = (ConcurrentMergeScheduler) writer2.getMergeScheduler(); + + readers = new IndexReader[NUM_COPY]; + for(int i=0;i 0) { // Forward any exceptions in background merge @@ -2310,6 +2353,12 @@ } } } + + // If close is called while we are still + // running, throw an exception so the calling + // thread will know the optimize did not + // complete + ensureOpen(); } // NOTE: in the ConcurrentMergeScheduler case, when @@ -2377,13 +2426,8 @@ } } - if (running) { - try { - wait(); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } - } + if (running) + doWait(); } } } @@ -2500,27 +2544,50 @@ message("now start transaction"); assert docWriter.getNumBufferedDeleteTerms() == 0 : - "calling startTransaction with buffered delete terms not supported"; + "calling startTransaction with buffered delete terms not supported: numBufferedDeleteTerms=" + docWriter.getNumBufferedDeleteTerms(); assert docWriter.getNumDocsInRAM() == 0 : - "calling startTransaction with buffered documents not supported"; + "calling startTransaction with buffered documents not supported: numDocsInRAM=" + docWriter.getNumDocsInRAM(); - localRollbackSegmentInfos = (SegmentInfos) segmentInfos.clone(); - localAutoCommit = autoCommit; - localFlushedDocCount = docWriter.getFlushedDocCount(); + ensureOpen(); - if (localAutoCommit) { + // If a transaction is trying to roll back (because + // addIndexes hit an exception) then wait here until + // that's done: + synchronized(this) { + while(stopMerges) + doWait(); + } - if (infoStream != null) - message("flush at startTransaction"); + acquireWrite(); - flush(true, false, false); + boolean success = false; + try { + localRollbackSegmentInfos = (SegmentInfos) segmentInfos.clone(); - // Turn off auto-commit during our local transaction: - autoCommit = false; - } else - // We must "protect" our files at this point from - // deletion in case we need to rollback: - deleter.incRef(segmentInfos, false); + assert !hasExternalSegments(segmentInfos); + + localAutoCommit = autoCommit; + localFlushedDocCount = docWriter.getFlushedDocCount(); + + if (localAutoCommit) { + + if (infoStream != null) + message("flush at startTransaction"); + + flush(true, false, false); + + // Turn off auto-commit during our local transaction: + autoCommit = false; + } else + // We must "protect" our files at this point from + // deletion in case we need to rollback: + deleter.incRef(segmentInfos, false); + + success = true; + } finally { + if (!success) + finishAddIndexes(); + } } /* @@ -2536,6 +2603,12 @@ autoCommit = localAutoCommit; docWriter.setFlushedDocCount(localFlushedDocCount); + // Must finish merges before rolling back segmentInfos + // so merges don't hit exceptions on trying to commit + // themselves, don't get files deleted out from under + // them, etc: + finishMerges(false); + // Keep the same segmentInfos instance but replace all // of its SegmentInfo instances. This is so the next // attempt to commit using this instance of IndexWriter @@ -2544,6 +2617,11 @@ segmentInfos.addAll(localRollbackSegmentInfos); localRollbackSegmentInfos = null; + // This must come after we rollback segmentInfos, so + // that if a commit() kicks off it does not see the + // segmentInfos with external segments + finishAddIndexes(); + // Ask deleter to locate unreferenced files we had // created & remove them: deleter.checkpoint(segmentInfos, false); @@ -2552,9 +2630,16 @@ // Remove the incRef we did in startTransaction: deleter.decRef(segmentInfos); + // Also ask deleter to remove any newly created files + // that were never incref'd; this "garbage" is created + // when a merge kicks off but aborts part way through + // before it had a chance to incRef the files it had + // partially created deleter.refresh(); - finishMerges(false); - stopMerges = false; + + notifyAll(); + + assert !hasExternalSegments(); } /* @@ -2590,6 +2675,10 @@ deleter.decRef(localRollbackSegmentInfos); localRollbackSegmentInfos = null; + + assert !hasExternalSegments(); + + finishAddIndexes(); } /** @@ -2626,6 +2715,8 @@ boolean success = false; + docWriter.pauseAllThreads(); + try { finishMerges(false); @@ -2651,6 +2742,8 @@ // once"). segmentInfos.clear(); segmentInfos.addAll(rollbackSegmentInfos); + + assert !hasExternalSegments(); docWriter.abort(); @@ -2671,6 +2764,7 @@ } finally { synchronized(this) { if (!success) { + docWriter.resumeAllThreads(); closing = false; notifyAll(); if (infoStream != null) @@ -2706,6 +2800,10 @@ merge.abort(); } + // Ensure any running addIndexes finishes + acquireRead(); + releaseRead(); + // These merges periodically check whether they have // been aborted, and stop if so. We wait here to make // sure they all stop. It should not take very long @@ -2714,25 +2812,23 @@ while(runningMerges.size() > 0) { if (infoStream != null) message("now wait for " + runningMerges.size() + " running merge to abort"); - try { - wait(); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } + doWait(); } + stopMerges = false; + notifyAll(); + assert 0 == mergingSegments.size(); if (infoStream != null) message("all running merges have aborted"); } else { - while(pendingMerges.size() > 0 || runningMerges.size() > 0) { - try { - wait(); - } catch (InterruptedException ie) { - } - } + // Ensure any running addIndexes finishes + acquireRead(); + releaseRead(); + while(pendingMerges.size() > 0 || runningMerges.size() > 0) + doWait(); assert 0 == mergingSegments.size(); } } @@ -2747,6 +2843,31 @@ deleter.checkpoint(segmentInfos, false); } + private void finishAddIndexes() { + releaseWrite(); + } + + private void blockAddIndexes(boolean includePendingClose) { + + acquireRead(); + + boolean success = false; + try { + + // Make sure we are still open since we could have + // waited quite a while for last addIndexes to finish + ensureOpen(includePendingClose); + success = true; + } finally { + if (!success) + releaseRead(); + } + } + + private void resumeAddIndexes() { + releaseRead(); + } + /** Merges all segments from an array of indexes into this index. * *

This may be used to parallelize batch indexing. A large document @@ -2807,6 +2928,8 @@ throws CorruptIndexException, IOException { ensureOpen(); + + noDupDirs(dirs); // Do not allow add docs or deletes while we are running: docWriter.pauseAllThreads(); @@ -2825,12 +2948,14 @@ int docCount = 0; synchronized(this) { + ensureOpen(); for (int i = 0; i < dirs.length; i++) { SegmentInfos sis = new SegmentInfos(); // read infos from dir sis.read(dirs[i]); for (int j = 0; j < sis.size(); j++) { final SegmentInfo info = sis.info(j); docCount += info.docCount; + assert !segmentInfos.contains(info); segmentInfos.addElement(info); // add each info } } @@ -2862,6 +2987,17 @@ mergeGen++; } + private void noDupDirs(Directory[] dirs) { + HashSet dups = new HashSet(); + for(int i=0;i @@ -2895,6 +3031,8 @@ ensureOpen(); + noDupDirs(dirs); + // Do not allow add docs or deletes while we are running: docWriter.pauseAllThreads(); @@ -2911,6 +3049,8 @@ int docCount = 0; synchronized(this) { + ensureOpen(); + for (int i = 0; i < dirs.length; i++) { if (directory == dirs[i]) { // cannot add this index: segments may be deleted in merge before added @@ -2921,6 +3061,7 @@ sis.read(dirs[i]); for (int j = 0; j < sis.size(); j++) { SegmentInfo info = sis.info(j); + assert !segmentInfos.contains(info): "dup info dir=" + info.dir + " name=" + info.name; docCount += info.docCount; segmentInfos.addElement(info); // add each info } @@ -2932,6 +3073,8 @@ maybeMerge(); + ensureOpen(); + // If after merging there remain segments in the index // that are in a different directory, just copy these // over into our index. This is necessary (before @@ -2939,6 +3082,8 @@ // index in an unusable (inconsistent) state. copyExternalSegments(); + ensureOpen(); + success = true; } finally { @@ -2956,6 +3101,18 @@ } } + private boolean hasExternalSegments() { + return hasExternalSegments(segmentInfos); + } + + private boolean hasExternalSegments(SegmentInfos infos) { + final int numSegments = infos.size(); + for(int i=0;i */ + + private boolean committing; + + synchronized private void waitForCommit() { + // Only allow a single thread to do the commit, at a time: + while(committing) + doWait(); + committing = true; + } + + synchronized private void doneCommit() { + committing = false; + notifyAll(); + } + public final void commit() throws CorruptIndexException, IOException { - message("commit: start"); + ensureOpen(); - if (autoCommit || pendingCommit == null) { - message("commit: now prepare"); - prepareCommit(true); - } else - message("commit: already prepared"); + // Only let one thread do the prepare/finish at a time + waitForCommit(); - finishCommit(); + try { + message("commit: start"); + + if (autoCommit || pendingCommit == null) { + message("commit: now prepare"); + prepareCommit(true); + } else + message("commit: already prepared"); + + finishCommit(); + } finally { + doneCommit(); + } } private synchronized final void finishCommit() throws CorruptIndexException, IOException { @@ -3234,7 +3444,7 @@ pendingCommit.finishCommit(directory); lastCommitChangeCount = pendingCommitChangeCount; segmentInfos.updateGeneration(pendingCommit); - setRollbackSegmentInfos(); + setRollbackSegmentInfos(pendingCommit); deleter.checkpoint(pendingCommit, true); } finally { deleter.decRef(pendingCommit); @@ -3259,7 +3469,8 @@ * be flushed */ protected final void flush(boolean triggerMerge, boolean flushDocStores, boolean flushDeletes) throws CorruptIndexException, IOException { - ensureOpen(); + // We can be called during close, when closing==true, so we must pass false to ensureOpen: + ensureOpen(false); if (doFlush(flushDocStores, flushDeletes) && triggerMerge) maybeMerge(); } @@ -3269,12 +3480,14 @@ // even while a flush is happening private synchronized final boolean doFlush(boolean flushDocStores, boolean flushDeletes) throws CorruptIndexException, IOException { - // Make sure no threads are actively adding a document + ensureOpen(false); assert testPoint("startDoFlush"); flushCount++; + // Make sure no threads are actively adding a document + flushDeletes |= docWriter.deletesFull(); // When autoCommit=true we must always flush deletes @@ -3455,7 +3668,7 @@ if (segmentInfos.indexOf(info) == -1) throw new MergePolicy.MergeException("MergePolicy selected a segment (" + info.name + ") that is not in the index", directory); else - throw new MergePolicy.MergeException("MergePolicy selected non-contiguous segments to merge (" + merge + " vs " + segString() + "), which IndexWriter (currently) cannot handle", + throw new MergePolicy.MergeException("MergePolicy selected non-contiguous segments to merge (" + merge.segString(directory) + " vs " + segString() + "), which IndexWriter (currently) cannot handle", directory); } } @@ -3572,7 +3785,7 @@ return false; if (infoStream != null) - message("commitMerge: " + merge.segString(directory)); + message("commitMerge: " + merge.segString(directory) + " index=" + segString()); assert merge.registerDone; @@ -3616,6 +3829,7 @@ } segmentInfos.subList(start, start + merge.segments.size()).clear(); + assert !segmentInfos.contains(merge.info); segmentInfos.add(start, merge.info); // Must checkpoint before decrefing so any newly @@ -3660,7 +3874,7 @@ mergeInit(merge); if (infoStream != null) - message("now merge\n merge=" + merge.segString(directory) + "\n index=" + segString()); + message("now merge\n merge=" + merge.segString(directory) + "\n merge=" + merge + "\n index=" + segString()); mergeMiddle(merge); success = true; @@ -3696,10 +3910,6 @@ updatePendingMerges(merge.maxNumSegmentsOptimize, merge.optimize); } finally { runningMerges.remove(merge); - // Optimize may be waiting on the final optimize - // merge to finish; and finishMerges() may be - // waiting for all merges to finish: - notifyAll(); } } } @@ -3715,11 +3925,16 @@ * are now participating in a merge, and true is * returned. Else (the merge conflicts) false is * returned. */ - final synchronized boolean registerMerge(MergePolicy.OneMerge merge) { + final synchronized boolean registerMerge(MergePolicy.OneMerge merge) throws MergePolicy.MergeAbortedException { if (merge.registerDone) return true; + if (stopMerges) { + merge.abort(); + throw new MergePolicy.MergeAbortedException("merge is aborted: " + merge.segString(directory)); + } + final int count = merge.segments.size(); boolean isExternal = false; for(int i=0;i 0) { buffer.append(' '); } - buffer.append(infos.info(i).segString(directory)); + final SegmentInfo info = infos.info(i); + buffer.append(info.segString(directory)); + if (info.dir != directory) + buffer.append("**"); } return buffer.toString(); } @@ -4254,6 +4488,20 @@ } } + private synchronized void doWait() { + try { + // NOTE: the callers of this method should in theory + // be able to do simply wait(), but, as a defense + // against thread timing hazards where notifyAll() + // falls to be called, we wait for at most 1 second + // and then return so caller can check if wait + // conditions are satisified: + wait(1000); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + /** Walk through all files referenced by the current * segmentInfos and ask the Directory to sync each file, * if it wasn't already. If that succeeds, then we @@ -4279,26 +4527,38 @@ synchronized(this) { - assert lastCommitChangeCount <= changeCount; + // Wait for any running addIndexes to complete + // first, then block any from running until we've + // copied the segmentInfos we intend to sync: + blockAddIndexes(false); - if (changeCount == lastCommitChangeCount) { - if (infoStream != null) - message(" skip startCommit(): no changes pending"); - return; - } + assert !hasExternalSegments(); - // First, we clone & incref the segmentInfos we intend - // to sync, then, without locking, we sync() each file - // referenced by toSync, in the background. Multiple - // threads can be doing this at once, if say a large - // merge and a small merge finish at the same time: + try { - if (infoStream != null) - message("startCommit index=" + segString(segmentInfos) + " changeCount=" + changeCount); + assert lastCommitChangeCount <= changeCount; - toSync = (SegmentInfos) segmentInfos.clone(); - deleter.incRef(toSync, false); - myChangeCount = changeCount; + if (changeCount == lastCommitChangeCount) { + if (infoStream != null) + message(" skip startCommit(): no changes pending"); + return; + } + + // First, we clone & incref the segmentInfos we intend + // to sync, then, without locking, we sync() each file + // referenced by toSync, in the background. Multiple + // threads can be doing this at once, if say a large + // merge and a small merge finish at the same time: + + if (infoStream != null) + message("startCommit index=" + segString(segmentInfos) + " changeCount=" + changeCount); + + toSync = (SegmentInfos) segmentInfos.clone(); + deleter.incRef(toSync, false); + myChangeCount = changeCount; + } finally { + resumeAddIndexes(); + } } assert testPoint("midStartCommit"); @@ -4322,7 +4582,7 @@ try { // Because we incRef'd this commit point, above, // the file had better exist: - assert directory.fileExists(fileName); + assert directory.fileExists(fileName): "file '" + fileName + "' does not exist dir=" + directory; message("now sync " + fileName); directory.sync(fileName); success = true; @@ -4355,11 +4615,7 @@ // Wait now for any current pending commit to complete: while(pendingCommit != null) { message("wait for existing pendingCommit to finish..."); - try { - wait(); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } + doWait(); } if (segmentInfos.getGeneration() > toSync.getGeneration())