Index: src/test/org/apache/lucene/index/TestIndexWriter.java =================================================================== --- src/test/org/apache/lucene/index/TestIndexWriter.java (revision 688090) +++ src/test/org/apache/lucene/index/TestIndexWriter.java (working copy) @@ -22,6 +22,7 @@ import java.io.File; import java.util.Arrays; import java.util.ArrayList; +import java.util.List; import java.util.Random; import org.apache.lucene.util.LuceneTestCase; @@ -125,7 +126,7 @@ writer.close(); } - private void addDoc(IndexWriter writer) throws IOException + private static void addDoc(IndexWriter writer) throws IOException { Document doc = new Document(); doc.add(new Field("content", "aaa", Field.Store.NO, Field.Index.TOKENIZED)); @@ -437,6 +438,10 @@ (dir.getMaxUsedSizeInBytes()-startDiskUsage) < 2*(startDiskUsage + inputDiskUsage)); } + // Make sure we don't hit disk full during close below: + dir.setMaxSizeInBytes(0); + dir.setRandomIOExceptionRate(0.0, 0); + writer.close(); // Wait for all BG threads to finish else @@ -2338,6 +2343,9 @@ IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED); ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler(); + // We expect AlreadyClosedException + cms.setSuppressExceptions(); + writer.setMergeScheduler(cms); writer.setMaxBufferedDocs(10); writer.setMergeFactor(4); @@ -2891,7 +2899,7 @@ writer.setMergeScheduler(new SerialMergeScheduler()); writer.setMergePolicy(new LogDocMergePolicy()); - Directory[] indexDirs = { dir}; + Directory[] indexDirs = {new MockRAMDirectory(dir)}; writer.addIndexes(indexDirs); writer.close(); } @@ -3732,6 +3740,278 @@ dir.close(); } + private abstract static class RunAddIndexesThreads { + + Directory dir, dir2; + final static int NUM_INIT_DOCS = 17; + IndexWriter writer2; + final List failures = new ArrayList(); + volatile boolean didClose; + final IndexReader[] readers; + final int NUM_COPY; + final static int NUM_THREADS = 5; + final Thread[] threads = new Thread[NUM_THREADS]; + final ConcurrentMergeScheduler cms; + + public RunAddIndexesThreads(int numCopy) throws Throwable { + NUM_COPY = numCopy; + dir = new MockRAMDirectory(); + IndexWriter writer = new IndexWriter(dir, false, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED); + writer.setMaxBufferedDocs(2); + for (int i = 0; i < NUM_INIT_DOCS; i++) + addDoc(writer); + writer.close(); + + dir2 = new MockRAMDirectory(); + writer2 = new IndexWriter(dir2, false, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED); + cms = (ConcurrentMergeScheduler) writer2.getMergeScheduler(); + + readers = new IndexReader[NUM_COPY]; + for(int i=0;i= 0 */ public static final int FORMAT = -1; @@ -826,4 +825,19 @@ prepareCommit(dir); finishCommit(dir); } + + synchronized String segString(Directory directory) { + StringBuffer buffer = new StringBuffer(); + final int count = size(); + for(int i = 0; i < count; i++) { + if (i > 0) { + buffer.append(' '); + } + final SegmentInfo info = info(i); + buffer.append(info.segString(directory)); + if (info.dir != directory) + buffer.append("**"); + } + return buffer.toString(); + } } Index: src/java/org/apache/lucene/index/IndexWriter.java =================================================================== --- src/java/org/apache/lucene/index/IndexWriter.java (revision 688090) +++ src/java/org/apache/lucene/index/IndexWriter.java (working copy) @@ -300,7 +300,7 @@ private Similarity similarity = Similarity.getDefault(); // how to normalize - private volatile long changeCount; // increments every a change is completed + private volatile long changeCount; // increments every time a change is completed private long lastCommitChangeCount; // last changeCount that was committed private SegmentInfos rollbackSegmentInfos; // segmentInfos we will fallback to if the commit fails @@ -345,18 +345,58 @@ private int flushDeletesCount; private double maxSyncPauseSeconds = DEFAULT_MAX_SYNC_PAUSE_SECONDS; + // Used to only allow one addIndexes to proceed at once + // TODO: use ReadWriteLock once we are on 5.0 + private int readCount; // count of how many threads are holding read lock + private Thread writeThread; // non-null if any thread holds write lock + + synchronized void acquireWrite() { + while(writeThread != null || readCount > 0) + doWait(); + + // We could have been closed while we were waiting: + ensureOpen(); + + writeThread = Thread.currentThread(); + } + + synchronized void releaseWrite() { + assert Thread.currentThread() == writeThread; + writeThread = null; + notifyAll(); + } + + synchronized void acquireRead() { + final Thread current = Thread.currentThread(); + while(writeThread != null && writeThread != current) + doWait(); + + readCount++; + } + + synchronized void releaseRead() { + readCount--; + assert readCount >= 0; + if (0 == readCount) + notifyAll(); + } + /** * Used internally to throw an {@link * AlreadyClosedException} if this IndexWriter has been * closed. * @throws AlreadyClosedException if this IndexWriter is */ - protected final void ensureOpen() throws AlreadyClosedException { - if (closed) { + protected synchronized final void ensureOpen(boolean includePendingClose) throws AlreadyClosedException { + if (closed || (includePendingClose && closing)) { throw new AlreadyClosedException("this IndexWriter is closed"); } } + protected synchronized final void ensureOpen() throws AlreadyClosedException { + ensureOpen(true); + } + /** * Prints a message to the infoStream (if non-null), * prefixed with the identifying information for this @@ -469,7 +509,8 @@ * @see #setTermIndexInterval(int) */ public int getTermIndexInterval() { - ensureOpen(); + // We pass false because this method is called by SegmentMerger while we are in the process of closing + ensureOpen(false); return termIndexInterval; } @@ -1127,7 +1168,7 @@ } this.autoCommit = autoCommit; - setRollbackSegmentInfos(); + setRollbackSegmentInfos(segmentInfos); docWriter = new DocumentsWriter(directory, this); docWriter.setInfoStream(infoStream); @@ -1153,8 +1194,9 @@ } } - private void setRollbackSegmentInfos() { - rollbackSegmentInfos = (SegmentInfos) segmentInfos.clone(); + private synchronized void setRollbackSegmentInfos(SegmentInfos infos) { + rollbackSegmentInfos = (SegmentInfos) infos.clone(); + assert !hasExternalSegments(rollbackSegmentInfos); rollbackSegments = new HashMap(); final int size = rollbackSegmentInfos.size(); for(int i=0;i 0) { // Forward any exceptions in background merge @@ -2310,6 +2353,12 @@ } } } + + // If close is called while we are still + // running, throw an exception so the calling + // thread will know the optimize did not + // complete + ensureOpen(); } // NOTE: in the ConcurrentMergeScheduler case, when @@ -2364,6 +2413,9 @@ boolean running = true; while(running) { + // Check each merge that MergePolicy asked us to + // do, to see if any of them are still running and + // if any of them have hit an exception. running = false; for(int i=0;i 0) { if (infoStream != null) message("now wait for " + runningMerges.size() + " running merge to abort"); - try { - wait(); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } + doWait(); } + stopMerges = false; + notifyAll(); + assert 0 == mergingSegments.size(); if (infoStream != null) message("all running merges have aborted"); } else { - while(pendingMerges.size() > 0 || runningMerges.size() > 0) { - try { - wait(); - } catch (InterruptedException ie) { - } - } + // Ensure any running addIndexes finishes. It's fine + // if a new one attempts to start because from our + // caller above the call will see that we are in the + // process of closing, and will throw an + // AlreadyClosedException. + acquireRead(); + releaseRead(); + while(pendingMerges.size() > 0 || runningMerges.size() > 0) + doWait(); assert 0 == mergingSegments.size(); } } @@ -2747,6 +2885,31 @@ deleter.checkpoint(segmentInfos, false); } + private void finishAddIndexes() { + releaseWrite(); + } + + private void blockAddIndexes(boolean includePendingClose) { + + acquireRead(); + + boolean success = false; + try { + + // Make sure we are still open since we could have + // waited quite a while for last addIndexes to finish + ensureOpen(includePendingClose); + success = true; + } finally { + if (!success) + releaseRead(); + } + } + + private void resumeAddIndexes() { + releaseRead(); + } + /** Merges all segments from an array of indexes into this index. * *

This may be used to parallelize batch indexing. A large document @@ -2807,6 +2970,8 @@ throws CorruptIndexException, IOException { ensureOpen(); + + noDupDirs(dirs); // Do not allow add docs or deletes while we are running: docWriter.pauseAllThreads(); @@ -2819,18 +2984,20 @@ boolean success = false; - startTransaction(); + startTransaction(false); try { int docCount = 0; synchronized(this) { + ensureOpen(); for (int i = 0; i < dirs.length; i++) { SegmentInfos sis = new SegmentInfos(); // read infos from dir sis.read(dirs[i]); for (int j = 0; j < sis.size(); j++) { final SegmentInfo info = sis.info(j); docCount += info.docCount; + assert !segmentInfos.contains(info); segmentInfos.addElement(info); // add each info } } @@ -2862,6 +3029,17 @@ mergeGen++; } + private void noDupDirs(Directory[] dirs) { + HashSet dups = new HashSet(); + for(int i=0;i @@ -2895,6 +3073,8 @@ ensureOpen(); + noDupDirs(dirs); + // Do not allow add docs or deletes while we are running: docWriter.pauseAllThreads(); @@ -2905,12 +3085,14 @@ boolean success = false; - startTransaction(); + startTransaction(false); try { int docCount = 0; synchronized(this) { + ensureOpen(); + for (int i = 0; i < dirs.length; i++) { if (directory == dirs[i]) { // cannot add this index: segments may be deleted in merge before added @@ -2921,6 +3103,7 @@ sis.read(dirs[i]); for (int j = 0; j < sis.size(); j++) { SegmentInfo info = sis.info(j); + assert !segmentInfos.contains(info): "dup info dir=" + info.dir + " name=" + info.name; docCount += info.docCount; segmentInfos.addElement(info); // add each info } @@ -2932,13 +3115,17 @@ maybeMerge(); + ensureOpen(); + // If after merging there remain segments in the index // that are in a different directory, just copy these // over into our index. This is necessary (before // finishing the transaction) to avoid leaving the // index in an unusable (inconsistent) state. - copyExternalSegments(); + resolveExternalSegments(); + ensureOpen(); + success = true; } finally { @@ -2956,47 +3143,85 @@ } } + private boolean hasExternalSegments() { + return hasExternalSegments(segmentInfos); + } + + private boolean hasExternalSegments(SegmentInfos infos) { + final int numSegments = infos.size(); + for(int i=0;i */ + + private boolean committing; + + synchronized private void waitForCommit() { + // Only allow a single thread to do the commit, at a time: + while(committing) + doWait(); + committing = true; + } + + synchronized private void doneCommit() { + committing = false; + notifyAll(); + } + public final void commit() throws CorruptIndexException, IOException { - message("commit: start"); + ensureOpen(); - if (autoCommit || pendingCommit == null) { - message("commit: now prepare"); - prepareCommit(true); - } else - message("commit: already prepared"); + // Only let one thread do the prepare/finish at a time + waitForCommit(); - finishCommit(); + try { + message("commit: start"); + + if (autoCommit || pendingCommit == null) { + message("commit: now prepare"); + prepareCommit(true); + } else + message("commit: already prepared"); + + finishCommit(); + } finally { + doneCommit(); + } } private synchronized final void finishCommit() throws CorruptIndexException, IOException { @@ -3234,7 +3525,7 @@ pendingCommit.finishCommit(directory); lastCommitChangeCount = pendingCommitChangeCount; segmentInfos.updateGeneration(pendingCommit); - setRollbackSegmentInfos(); + setRollbackSegmentInfos(pendingCommit); deleter.checkpoint(pendingCommit, true); } finally { deleter.decRef(pendingCommit); @@ -3259,7 +3550,8 @@ * be flushed */ protected final void flush(boolean triggerMerge, boolean flushDocStores, boolean flushDeletes) throws CorruptIndexException, IOException { - ensureOpen(); + // We can be called during close, when closing==true, so we must pass false to ensureOpen: + ensureOpen(false); if (doFlush(flushDocStores, flushDeletes) && triggerMerge) maybeMerge(); } @@ -3269,12 +3561,14 @@ // even while a flush is happening private synchronized final boolean doFlush(boolean flushDocStores, boolean flushDeletes) throws CorruptIndexException, IOException { - // Make sure no threads are actively adding a document + ensureOpen(false); assert testPoint("startDoFlush"); flushCount++; + // Make sure no threads are actively adding a document + flushDeletes |= docWriter.deletesFull(); // When autoCommit=true we must always flush deletes @@ -3456,7 +3750,7 @@ if (segmentInfos.indexOf(info) == -1) throw new MergePolicy.MergeException("MergePolicy selected a segment (" + info.name + ") that is not in the index", directory); else - throw new MergePolicy.MergeException("MergePolicy selected non-contiguous segments to merge (" + merge + " vs " + segString() + "), which IndexWriter (currently) cannot handle", + throw new MergePolicy.MergeException("MergePolicy selected non-contiguous segments to merge (" + merge.segString(directory) + " vs " + segString() + "), which IndexWriter (currently) cannot handle", directory); } } @@ -3470,7 +3764,7 @@ * deletes may have been flushed to the segments since * the merge was started. This method "carries over" * such new deletes onto the newly merged segment, and - * saves the results deletes file (incrementing the + * saves the resulting deletes file (incrementing the * delete generation for merge.info). If no deletes were * flushed, no new deletes file is saved. */ synchronized private void commitMergedDeletes(MergePolicy.OneMerge merge) throws IOException { @@ -3573,7 +3867,7 @@ return false; if (infoStream != null) - message("commitMerge: " + merge.segString(directory)); + message("commitMerge: " + merge.segString(directory) + " index=" + segString()); assert merge.registerDone; @@ -3619,6 +3913,7 @@ merge.info.setHasProx(merger.hasProx()); segmentInfos.subList(start, start + merge.segments.size()).clear(); + assert !segmentInfos.contains(merge.info); segmentInfos.add(start, merge.info); // Must checkpoint before decrefing so any newly @@ -3663,13 +3958,14 @@ mergeInit(merge); if (infoStream != null) - message("now merge\n merge=" + merge.segString(directory) + "\n index=" + segString()); + message("now merge\n merge=" + merge.segString(directory) + "\n merge=" + merge + "\n index=" + segString()); mergeMiddle(merge); success = true; } catch (MergePolicy.MergeAbortedException e) { merge.setException(e); addMergeException(merge); + // We can ignore this exception, unless the merge // involves segments from external directories, in // which case we must throw it so, for example, the @@ -3699,10 +3995,6 @@ updatePendingMerges(merge.maxNumSegmentsOptimize, merge.optimize); } finally { runningMerges.remove(merge); - // Optimize may be waiting on the final optimize - // merge to finish; and finishMerges() may be - // waiting for all merges to finish: - notifyAll(); } } } @@ -3718,11 +4010,16 @@ * are now participating in a merge, and true is * returned. Else (the merge conflicts) false is * returned. */ - final synchronized boolean registerMerge(MergePolicy.OneMerge merge) { + final synchronized boolean registerMerge(MergePolicy.OneMerge merge) throws MergePolicy.MergeAbortedException { if (merge.registerDone) return true; + if (stopMerges) { + merge.abort(); + throw new MergePolicy.MergeAbortedException("merge is aborted: " + merge.segString(directory)); + } + final int count = merge.segments.size(); boolean isExternal = false; for(int i=0;i 0) { buffer.append(' '); } - buffer.append(infos.info(i).segString(directory)); + final SegmentInfo info = infos.info(i); + buffer.append(info.segString(directory)); + if (info.dir != directory) + buffer.append("**"); } return buffer.toString(); } @@ -4258,6 +4574,20 @@ } } + private synchronized void doWait() { + try { + // NOTE: the callers of this method should in theory + // be able to do simply wait(), but, as a defense + // against thread timing hazards where notifyAll() + // falls to be called, we wait for at most 1 second + // and then return so caller can check if wait + // conditions are satisified: + wait(1000); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + /** Walk through all files referenced by the current * segmentInfos and ask the Directory to sync each file, * if it wasn't already. If that succeeds, then we @@ -4283,26 +4613,38 @@ synchronized(this) { - assert lastCommitChangeCount <= changeCount; + // Wait for any running addIndexes to complete + // first, then block any from running until we've + // copied the segmentInfos we intend to sync: + blockAddIndexes(false); - if (changeCount == lastCommitChangeCount) { - if (infoStream != null) - message(" skip startCommit(): no changes pending"); - return; - } + assert !hasExternalSegments(); - // First, we clone & incref the segmentInfos we intend - // to sync, then, without locking, we sync() each file - // referenced by toSync, in the background. Multiple - // threads can be doing this at once, if say a large - // merge and a small merge finish at the same time: + try { - if (infoStream != null) - message("startCommit index=" + segString(segmentInfos) + " changeCount=" + changeCount); + assert lastCommitChangeCount <= changeCount; - toSync = (SegmentInfos) segmentInfos.clone(); - deleter.incRef(toSync, false); - myChangeCount = changeCount; + if (changeCount == lastCommitChangeCount) { + if (infoStream != null) + message(" skip startCommit(): no changes pending"); + return; + } + + // First, we clone & incref the segmentInfos we intend + // to sync, then, without locking, we sync() each file + // referenced by toSync, in the background. Multiple + // threads can be doing this at once, if say a large + // merge and a small merge finish at the same time: + + if (infoStream != null) + message("startCommit index=" + segString(segmentInfos) + " changeCount=" + changeCount); + + toSync = (SegmentInfos) segmentInfos.clone(); + deleter.incRef(toSync, false); + myChangeCount = changeCount; + } finally { + resumeAddIndexes(); + } } assert testPoint("midStartCommit"); @@ -4326,7 +4668,7 @@ try { // Because we incRef'd this commit point, above, // the file had better exist: - assert directory.fileExists(fileName); + assert directory.fileExists(fileName): "file '" + fileName + "' does not exist dir=" + directory; message("now sync " + fileName); directory.sync(fileName); success = true; @@ -4359,11 +4701,7 @@ // Wait now for any current pending commit to complete: while(pendingCommit != null) { message("wait for existing pendingCommit to finish..."); - try { - wait(); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } + doWait(); } if (segmentInfos.getGeneration() > toSync.getGeneration())